diff --git a/cpp/src/arrow/python/api.h b/cpp/src/arrow/python/api.h index 7cb36ad636fc9..4ceb3f1a45dc1 100644 --- a/cpp/src/arrow/python/api.h +++ b/cpp/src/arrow/python/api.h @@ -19,11 +19,13 @@ #define ARROW_PYTHON_API_H #include "arrow/python/arrow_to_pandas.h" +#include "arrow/python/arrow_to_python.h" #include "arrow/python/builtin_convert.h" #include "arrow/python/common.h" #include "arrow/python/helpers.h" #include "arrow/python/io.h" #include "arrow/python/numpy_convert.h" #include "arrow/python/pandas_to_arrow.h" +#include "arrow/python/python_to_arrow.h" #endif // ARROW_PYTHON_API_H diff --git a/cpp/src/arrow/python/arrow_to_python.cc b/cpp/src/arrow/python/arrow_to_python.cc index 6e94f5347c6c4..1c983827d2a47 100644 --- a/cpp/src/arrow/python/arrow_to_python.cc +++ b/cpp/src/arrow/python/arrow_to_python.cc @@ -19,6 +19,7 @@ #include "arrow/util/logging.h" +#include "arrow/ipc/reader.h" #include "arrow/python/common.h" #include "arrow/python/helpers.h" #include "arrow/python/numpy_convert.h" @@ -183,5 +184,26 @@ Status DeserializeTuple(std::shared_ptr array, int32_t start_idx, int32_t PyTuple_SetItem, out); } +Status ReadSerializedPythonSequence(std::shared_ptr src, + std::shared_ptr* batch_out, + std::vector>* tensors_out) { + std::shared_ptr reader; + int64_t offset; + int64_t bytes_read; + int32_t num_tensors; + // Read number of tensors + RETURN_NOT_OK(src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast(&num_tensors))); + RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader)); + RETURN_NOT_OK(reader->ReadNextRecordBatch(batch_out)); + RETURN_NOT_OK(src->Tell(&offset)); + for (int i = 0; i < num_tensors; ++i) { + std::shared_ptr tensor; + RETURN_NOT_OK(ipc::ReadTensor(offset, src.get(), &tensor)); + tensors_out->push_back(tensor); + RETURN_NOT_OK(src->Tell(&offset)); + } + return Status::OK(); +} + } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/arrow_to_python.h b/cpp/src/arrow/python/arrow_to_python.h index 92752a1694c70..8649e877428b2 100644 --- a/cpp/src/arrow/python/arrow_to_python.h +++ b/cpp/src/arrow/python/arrow_to_python.h @@ -21,6 +21,7 @@ #include #include "arrow/api.h" +#include "arrow/io/interfaces.h" #include @@ -39,6 +40,10 @@ Status DeserializeList(std::shared_ptr array, int32_t start_idx, int32_t const std::vector>& tensors, PyObject** out); +Status ReadSerializedPythonSequence(std::shared_ptr src, + std::shared_ptr* batch_out, + std::vector>* tensors_out); + } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index 99cf8e8beff18..6ef0b7d9f05d2 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -22,7 +22,7 @@ #include #include -#include "arrow/api.h" +#include "arrow/ipc/writer.h" #include "arrow/python/common.h" #include "arrow/python/helpers.h" #include "arrow/python/numpy_interop.h" @@ -39,6 +39,40 @@ PyObject* pyarrow_deserialize_callback = NULL; namespace arrow { namespace py { +/// Constructing dictionaries of key/value pairs. Sequences of +/// keys and values are built separately using a pair of +/// SequenceBuilders. The resulting Arrow representation +/// can be obtained via the Finish method. +class DictBuilder { + public: + explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {} + + /// Builder for the keys of the dictionary + SequenceBuilder& keys() { return keys_; } + /// Builder for the values of the dictionary + SequenceBuilder& vals() { return vals_; } + + /// Construct an Arrow StructArray representing the dictionary. + /// Contains a field "keys" for the keys and "vals" for the values. + + /// \param list_data + /// List containing the data from nested lists in the value + /// list of the dictionary + /// + /// \param dict_data + /// List containing the data from nested dictionaries in the + /// value list of the dictionary + arrow::Status Finish(std::shared_ptr key_tuple_data, + std::shared_ptr key_dict_data, + std::shared_ptr val_list_data, + std::shared_ptr val_tuple_data, + std::shared_ptr val_dict_data, std::shared_ptr* out); + + private: + SequenceBuilder keys_; + SequenceBuilder vals_; +}; + Status DictBuilder::Finish(std::shared_ptr key_tuple_data, std::shared_ptr key_dict_data, std::shared_ptr val_list_data, @@ -89,6 +123,14 @@ Status CallCustomSerializationCallback(PyObject* elem, PyObject** serialized_obj return Status::OK(); } +Status SerializeDict(std::vector dicts, int32_t recursion_depth, + std::shared_ptr* out, + std::vector* tensors_out); + +Status SerializeArray(PyArrayObject* array, SequenceBuilder* builder, + std::vector* subdicts, + std::vector* tensors_out); + Status AppendScalar(PyObject* obj, SequenceBuilder* builder) { if (PyArray_IsScalar(obj, Bool)) { return builder->AppendBool(reinterpret_cast(obj)->obval != 0); @@ -335,5 +377,25 @@ std::shared_ptr MakeBatch(std::shared_ptr data) { return std::shared_ptr(new RecordBatch(schema, data->length(), {data})); } +Status WriteSerializedPythonSequence(std::shared_ptr batch, + std::vector> tensors, + io::OutputStream* dst) { + int32_t num_tensors = tensors.size(); + std::shared_ptr writer; + int32_t metadata_length; + int64_t body_length; + + RETURN_NOT_OK(dst->Write(reinterpret_cast(&num_tensors), sizeof(int32_t))); + RETURN_NOT_OK(ipc::RecordBatchStreamWriter::Open(dst, batch->schema(), &writer)); + RETURN_NOT_OK(writer->WriteRecordBatch(*batch)); + RETURN_NOT_OK(writer->Close()); + + for (const auto& tensor : tensors) { + RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length)); + } + + return Status::OK(); +} + } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/python_to_arrow.h b/cpp/src/arrow/python/python_to_arrow.h index c4b4468929fdb..ed6ed4f41dc88 100644 --- a/cpp/src/arrow/python/python_to_arrow.h +++ b/cpp/src/arrow/python/python_to_arrow.h @@ -21,7 +21,7 @@ #include #include "arrow/api.h" - +#include "arrow/io/interfaces.h" #include "arrow/python/numpy_interop.h" #include "arrow/python/sequence.h" @@ -35,52 +35,14 @@ extern PyObject* pyarrow_deserialize_callback; namespace arrow { namespace py { -/// Constructing dictionaries of key/value pairs. Sequences of -/// keys and values are built separately using a pair of -/// SequenceBuilders. The resulting Arrow representation -/// can be obtained via the Finish method. -class DictBuilder { - public: - explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {} - - /// Builder for the keys of the dictionary - SequenceBuilder& keys() { return keys_; } - /// Builder for the values of the dictionary - SequenceBuilder& vals() { return vals_; } - - /// Construct an Arrow StructArray representing the dictionary. - /// Contains a field "keys" for the keys and "vals" for the values. - - /// \param list_data - /// List containing the data from nested lists in the value - /// list of the dictionary - /// - /// \param dict_data - /// List containing the data from nested dictionaries in the - /// value list of the dictionary - arrow::Status Finish(std::shared_ptr key_tuple_data, - std::shared_ptr key_dict_data, - std::shared_ptr val_list_data, - std::shared_ptr val_tuple_data, - std::shared_ptr val_dict_data, std::shared_ptr* out); - - private: - SequenceBuilder keys_; - SequenceBuilder vals_; -}; - -arrow::Status SerializeSequences(std::vector sequences, - int32_t recursion_depth, - std::shared_ptr* out, - std::vector* tensors_out); - -arrow::Status SerializeDict(std::vector dicts, int32_t recursion_depth, - std::shared_ptr* out, - std::vector* tensors_out); +Status WriteSerializedPythonSequence(std::shared_ptr batch, + std::vector> tensors, + io::OutputStream* dst); -arrow::Status SerializeArray(PyArrayObject* array, SequenceBuilder* builder, - std::vector* subdicts, - std::vector* tensors_out); +Status SerializeSequences(std::vector sequences, + int32_t recursion_depth, + std::shared_ptr* out, + std::vector* tensors_out); std::shared_ptr MakeBatch(std::shared_ptr data); diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi index bc8fd72a133ad..53680ce5748e8 100644 --- a/python/pyarrow/serialization.pxi +++ b/python/pyarrow/serialization.pxi @@ -30,6 +30,16 @@ cdef extern from "arrow/python/python_to_arrow.h" namespace 'arrow::py': cdef shared_ptr[CRecordBatch] MakeBatch(shared_ptr[CArray] data) +cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil: + + cdef CStatus WriteSerializedPythonSequence(shared_ptr[CRecordBatch] batch, + c_vector[shared_ptr[CTensor]] tensors, + OutputStream* dst) + + cdef CStatus ReadSerializedPythonSequence(shared_ptr[RandomAccessFile] src, + shared_ptr[CRecordBatch]* batch_out, + c_vector[shared_ptr[CTensor]]* tensors_out) + cdef extern from "arrow/python/python_to_arrow.h": cdef extern PyObject *pyarrow_serialize_callback @@ -180,46 +190,16 @@ def deserialize_sequence(PythonObject value, object base): def write_python_object(PythonObject value, int32_t num_tensors, NativeFile sink): cdef shared_ptr[OutputStream] stream sink.write_handle(&stream) - cdef shared_ptr[CRecordBatchStreamWriter] writer - cdef shared_ptr[CSchema] schema = deref(value.batch).schema() - cdef shared_ptr[CRecordBatch] batch = value.batch - cdef shared_ptr[CTensor] tensor - cdef int32_t metadata_length - cdef int64_t body_length with nogil: - # write number of tensors - check_status(stream.get().Write( &num_tensors, sizeof(int32_t))) - - check_status(CRecordBatchStreamWriter.Open(stream.get(), schema, &writer)) - check_status(deref(writer).WriteRecordBatch(deref(batch))) - check_status(deref(writer).Close()) - - for tensor in value.tensors: - check_status(WriteTensor(deref(tensor), stream.get(), &metadata_length, &body_length)) + check_status(WriteSerializedPythonSequence(value.batch, value.tensors, stream.get())) def read_python_object(NativeFile source): cdef PythonObject result = PythonObject() cdef shared_ptr[RandomAccessFile] stream source.read_handle(&stream) - cdef shared_ptr[CRecordBatchStreamReader] reader - cdef shared_ptr[CTensor] tensor - cdef int64_t offset - cdef int64_t bytes_read - cdef int32_t num_tensors with nogil: - # read number of tensors - check_status(stream.get().Read(sizeof(int32_t), &bytes_read, &num_tensors)) - - check_status(CRecordBatchStreamReader.Open( stream, &reader)) - check_status(reader.get().ReadNextRecordBatch(&result.batch)) - - check_status(deref(stream).Tell(&offset)) - - for i in range(num_tensors): - check_status(ReadTensor(offset, stream.get(), &tensor)) - result.tensors.push_back(tensor) - check_status(deref(stream).Tell(&offset)) + check_status(ReadSerializedPythonSequence(stream, &result.batch, &result.tensors)) return result