Skip to content

Commit

Permalink
make exported API more consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz committed Aug 17, 2017
1 parent e1fc0c5 commit faf9a3e
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 45 deletions.
12 changes: 12 additions & 0 deletions cpp/src/arrow/python/arrow_to_python.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ Status DeserializeTuple(std::shared_ptr<Array> array, int32_t start_idx, int32_t
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out);

Status DeserializeList(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx,
PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out);

Status DeserializeDict(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx,
PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
Expand Down Expand Up @@ -205,5 +210,12 @@ Status ReadSerializedPythonSequence(std::shared_ptr<io::RandomAccessFile> src,
return Status::OK();
}

Status DeserializePythonSequence(std::shared_ptr<RecordBatch> batch,
std::vector<std::shared_ptr<Tensor>> tensors,
PyObject* base,
PyObject** out) {
return DeserializeList(batch->column(0), 0, batch->num_rows(), base, tensors, out);
}

} // namespace py
} // namespace arrow
10 changes: 5 additions & 5 deletions cpp/src/arrow/python/arrow_to_python.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ namespace py {

Status CallCustomCallback(PyObject* callback, PyObject* elem, PyObject** result);

Status DeserializeList(std::shared_ptr<Array> array, int32_t start_idx, int32_t stop_idx,
PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out);

Status ReadSerializedPythonSequence(std::shared_ptr<io::RandomAccessFile> src,
std::shared_ptr<RecordBatch>* batch_out,
std::vector<std::shared_ptr<Tensor>>* tensors_out);

Status DeserializePythonSequence(std::shared_ptr<RecordBatch> batch,
std::vector<std::shared_ptr<Tensor>> tensors,
PyObject* base,
PyObject** out);

} // namespace py
} // namespace arrow

Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/python/python_to_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/ipc/writer.h"
#include "arrow/python/common.h"
#include "arrow/python/helpers.h"
#include "arrow/python/numpy_convert.h"
#include "arrow/python/numpy_interop.h"
#include "arrow/python/platform.h"
#include "arrow/python/sequence.h"
Expand Down Expand Up @@ -131,6 +132,11 @@ Status SerializeArray(PyArrayObject* array, SequenceBuilder* builder,
std::vector<PyObject*>* subdicts,
std::vector<PyObject*>* tensors_out);

Status SerializeSequences(std::vector<PyObject*> sequences,
int32_t recursion_depth,
std::shared_ptr<Array>* out,
std::vector<PyObject*>* tensors_out);

Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
if (PyArray_IsScalar(obj, Bool)) {
return builder->AppendBool(reinterpret_cast<PyBoolScalarObject*>(obj)->obval != 0);
Expand Down Expand Up @@ -377,6 +383,22 @@ std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data) {
return std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(), {data}));
}

Status SerializePythonSequence(PyObject* sequence,
std::shared_ptr<RecordBatch>* batch_out,
std::vector<std::shared_ptr<Tensor>>* tensors_out) {
std::vector<PyObject*> sequences = {sequence};
std::shared_ptr<Array> array;
std::vector<PyObject*> tensors;
RETURN_NOT_OK(SerializeSequences(sequences, 0, &array, &tensors));
*batch_out = MakeBatch(array);
for (const auto &tensor : tensors) {
std::shared_ptr<Tensor> out;
RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(), tensor, &out));
tensors_out->push_back(out);
}
return Status::OK();
}

Status WriteSerializedPythonSequence(std::shared_ptr<RecordBatch> batch,
std::vector<std::shared_ptr<Tensor>> tensors,
io::OutputStream* dst) {
Expand Down
11 changes: 4 additions & 7 deletions cpp/src/arrow/python/python_to_arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,14 @@ extern PyObject* pyarrow_deserialize_callback;
namespace arrow {
namespace py {

Status SerializePythonSequence(PyObject* sequence,
std::shared_ptr<RecordBatch>* batch_out,
std::vector<std::shared_ptr<Tensor>>* tensors_out);

Status WriteSerializedPythonSequence(std::shared_ptr<RecordBatch> batch,
std::vector<std::shared_ptr<Tensor>> tensors,
io::OutputStream* dst);

Status SerializeSequences(std::vector<PyObject*> sequences,
int32_t recursion_depth,
std::shared_ptr<Array>* out,
std::vector<PyObject*>* tensors_out);

std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data);

} // namespace py
} // namespace arrow

Expand Down
54 changes: 21 additions & 33 deletions python/pyarrow/serialization.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,37 @@ from cython.operator cimport dereference as deref

from pyarrow.compat import pickle

cdef extern from "arrow/python/python_to_arrow.h" namespace 'arrow::py':
cdef extern from "arrow/python/api.h" namespace 'arrow::py':

cdef CStatus SerializeSequences(c_vector[PyObject*] sequences,
int32_t recursion_depth, shared_ptr[CArray]* array_out,
c_vector[PyObject*]* tensors_out)
CStatus SerializePythonSequence(
PyObject* sequence,
shared_ptr[CRecordBatch]* batch_out,
c_vector[shared_ptr[CTensor]]* tensors_out)

cdef shared_ptr[CRecordBatch] MakeBatch(shared_ptr[CArray] data)
CStatus DeserializePythonSequence(
shared_ptr[CRecordBatch] batch,
c_vector[shared_ptr[CTensor]] tensors,
PyObject* base,
PyObject** out)

cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil:

cdef CStatus WriteSerializedPythonSequence(shared_ptr[CRecordBatch] batch,
c_vector[shared_ptr[CTensor]] tensors,
OutputStream* dst)
cdef CStatus WriteSerializedPythonSequence(
shared_ptr[CRecordBatch] batch,
c_vector[shared_ptr[CTensor]] tensors,
OutputStream* dst)

cdef CStatus ReadSerializedPythonSequence(shared_ptr[RandomAccessFile] src,
shared_ptr[CRecordBatch]* batch_out,
c_vector[shared_ptr[CTensor]]* tensors_out)
cdef CStatus ReadSerializedPythonSequence(
shared_ptr[RandomAccessFile] src,
shared_ptr[CRecordBatch]* batch_out,
c_vector[shared_ptr[CTensor]]* tensors_out)

cdef extern from "arrow/python/python_to_arrow.h":

cdef extern PyObject *pyarrow_serialize_callback

cdef extern PyObject *pyarrow_deserialize_callback

cdef extern from "arrow/python/arrow_to_python.h" namespace 'arrow::py':

cdef CStatus DeserializeList(shared_ptr[CArray] array, int32_t start_idx,
int32_t stop_idx, PyObject* base,
const c_vector[shared_ptr[CTensor]]& tensors, PyObject** out)

cdef class PythonObject:

cdef:
Expand Down Expand Up @@ -164,27 +165,14 @@ pyarrow_deserialize_callback = <PyObject*> deserialization_callback

# Main entry point for serialization
def serialize_sequence(object value):
cdef int32_t recursion_depth = 0
cdef PythonObject result = PythonObject()
cdef c_vector[PyObject*] sequences
cdef shared_ptr[CArray] array
cdef c_vector[PyObject*] tensors
cdef PyObject* tensor
cdef shared_ptr[CTensor] out
sequences.push_back(<PyObject*> value)
check_status(SerializeSequences(sequences, recursion_depth, &array, &tensors))
result.batch = MakeBatch(array)
num_tensors = 0
for tensor in tensors:
check_status(NdarrayToTensor(c_default_memory_pool(), <object> tensor, &out))
result.tensors.push_back(out)
num_tensors += 1
return result, num_tensors
check_status(SerializePythonSequence(<PyObject*> value, &result.batch, &result.tensors))
return result, result.tensors.size()

# Main entry point for deserialization
def deserialize_sequence(PythonObject value, object base):
cdef PyObject* result
check_status(DeserializeList(deref(value.batch).column(0), 0, deref(value.batch).num_rows(), <PyObject*> base, value.tensors, &result))
check_status(DeserializePythonSequence(value.batch, value.tensors, <PyObject*> base, &result))
return <object> result

def write_python_object(PythonObject value, int32_t num_tensors, NativeFile sink):
Expand Down

0 comments on commit faf9a3e

Please sign in to comment.