Skip to content

Commit

Permalink
restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
pcmoritz committed Aug 17, 2017
1 parent c1f377b commit e1fc0c5
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 79 deletions.
2 changes: 2 additions & 0 deletions cpp/src/arrow/python/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
#define ARROW_PYTHON_API_H

#include "arrow/python/arrow_to_pandas.h"
#include "arrow/python/arrow_to_python.h"
#include "arrow/python/builtin_convert.h"
#include "arrow/python/common.h"
#include "arrow/python/helpers.h"
#include "arrow/python/io.h"
#include "arrow/python/numpy_convert.h"
#include "arrow/python/pandas_to_arrow.h"
#include "arrow/python/python_to_arrow.h"

#endif // ARROW_PYTHON_API_H
22 changes: 22 additions & 0 deletions cpp/src/arrow/python/arrow_to_python.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "arrow/util/logging.h"

#include "arrow/ipc/reader.h"
#include "arrow/python/common.h"
#include "arrow/python/helpers.h"
#include "arrow/python/numpy_convert.h"
Expand Down Expand Up @@ -183,5 +184,26 @@ Status DeserializeTuple(std::shared_ptr<Array> array, int32_t start_idx, int32_t
PyTuple_SetItem, out);
}

Status ReadSerializedPythonSequence(std::shared_ptr<io::RandomAccessFile> src,
std::shared_ptr<RecordBatch>* batch_out,
std::vector<std::shared_ptr<Tensor>>* tensors_out) {
std::shared_ptr<ipc::RecordBatchStreamReader> reader;
int64_t offset;
int64_t bytes_read;
int32_t num_tensors;
// Read number of tensors
RETURN_NOT_OK(src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_tensors)));
RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader));
RETURN_NOT_OK(reader->ReadNextRecordBatch(batch_out));
RETURN_NOT_OK(src->Tell(&offset));
for (int i = 0; i < num_tensors; ++i) {
std::shared_ptr<Tensor> tensor;
RETURN_NOT_OK(ipc::ReadTensor(offset, src.get(), &tensor));
tensors_out->push_back(tensor);
RETURN_NOT_OK(src->Tell(&offset));
}
return Status::OK();
}

} // namespace py
} // namespace arrow
5 changes: 5 additions & 0 deletions cpp/src/arrow/python/arrow_to_python.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Python.h>

#include "arrow/api.h"
#include "arrow/io/interfaces.h"

#include <vector>

Expand All @@ -39,6 +40,10 @@ Status DeserializeList(std::shared_ptr<Array> array, int32_t start_idx, int32_t
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out);

Status ReadSerializedPythonSequence(std::shared_ptr<io::RandomAccessFile> src,
std::shared_ptr<RecordBatch>* batch_out,
std::vector<std::shared_ptr<Tensor>>* tensors_out);

} // namespace py
} // namespace arrow

Expand Down
64 changes: 63 additions & 1 deletion cpp/src/arrow/python/python_to_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
#include <numpy/arrayobject.h>
#include <numpy/arrayscalars.h>

#include "arrow/api.h"
#include "arrow/ipc/writer.h"
#include "arrow/python/common.h"
#include "arrow/python/helpers.h"
#include "arrow/python/numpy_interop.h"
Expand All @@ -39,6 +39,40 @@ PyObject* pyarrow_deserialize_callback = NULL;
namespace arrow {
namespace py {

/// Constructing dictionaries of key/value pairs. Sequences of
/// keys and values are built separately using a pair of
/// SequenceBuilders. The resulting Arrow representation
/// can be obtained via the Finish method.
class DictBuilder {
public:
explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {}

/// Builder for the keys of the dictionary
SequenceBuilder& keys() { return keys_; }
/// Builder for the values of the dictionary
SequenceBuilder& vals() { return vals_; }

/// Construct an Arrow StructArray representing the dictionary.
/// Contains a field "keys" for the keys and "vals" for the values.

/// \param list_data
/// List containing the data from nested lists in the value
/// list of the dictionary
///
/// \param dict_data
/// List containing the data from nested dictionaries in the
/// value list of the dictionary
arrow::Status Finish(std::shared_ptr<Array> key_tuple_data,
std::shared_ptr<Array> key_dict_data,
std::shared_ptr<Array> val_list_data,
std::shared_ptr<Array> val_tuple_data,
std::shared_ptr<Array> val_dict_data, std::shared_ptr<Array>* out);

private:
SequenceBuilder keys_;
SequenceBuilder vals_;
};

Status DictBuilder::Finish(std::shared_ptr<Array> key_tuple_data,
std::shared_ptr<Array> key_dict_data,
std::shared_ptr<Array> val_list_data,
Expand Down Expand Up @@ -89,6 +123,14 @@ Status CallCustomSerializationCallback(PyObject* elem, PyObject** serialized_obj
return Status::OK();
}

Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth,
std::shared_ptr<Array>* out,
std::vector<PyObject*>* tensors_out);

Status SerializeArray(PyArrayObject* array, SequenceBuilder* builder,
std::vector<PyObject*>* subdicts,
std::vector<PyObject*>* tensors_out);

Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
if (PyArray_IsScalar(obj, Bool)) {
return builder->AppendBool(reinterpret_cast<PyBoolScalarObject*>(obj)->obval != 0);
Expand Down Expand Up @@ -335,5 +377,25 @@ std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data) {
return std::shared_ptr<RecordBatch>(new RecordBatch(schema, data->length(), {data}));
}

Status WriteSerializedPythonSequence(std::shared_ptr<RecordBatch> batch,
std::vector<std::shared_ptr<Tensor>> tensors,
io::OutputStream* dst) {
int32_t num_tensors = tensors.size();
std::shared_ptr<ipc::RecordBatchStreamWriter> writer;
int32_t metadata_length;
int64_t body_length;

RETURN_NOT_OK(dst->Write(reinterpret_cast<uint8_t*>(&num_tensors), sizeof(int32_t)));
RETURN_NOT_OK(ipc::RecordBatchStreamWriter::Open(dst, batch->schema(), &writer));
RETURN_NOT_OK(writer->WriteRecordBatch(*batch));
RETURN_NOT_OK(writer->Close());

for (const auto& tensor : tensors) {
RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
}

return Status::OK();
}

} // namespace py
} // namespace arrow
54 changes: 8 additions & 46 deletions cpp/src/arrow/python/python_to_arrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include <Python.h>

#include "arrow/api.h"

#include "arrow/io/interfaces.h"
#include "arrow/python/numpy_interop.h"
#include "arrow/python/sequence.h"

Expand All @@ -35,52 +35,14 @@ extern PyObject* pyarrow_deserialize_callback;
namespace arrow {
namespace py {

/// Constructing dictionaries of key/value pairs. Sequences of
/// keys and values are built separately using a pair of
/// SequenceBuilders. The resulting Arrow representation
/// can be obtained via the Finish method.
class DictBuilder {
public:
explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {}

/// Builder for the keys of the dictionary
SequenceBuilder& keys() { return keys_; }
/// Builder for the values of the dictionary
SequenceBuilder& vals() { return vals_; }

/// Construct an Arrow StructArray representing the dictionary.
/// Contains a field "keys" for the keys and "vals" for the values.

/// \param list_data
/// List containing the data from nested lists in the value
/// list of the dictionary
///
/// \param dict_data
/// List containing the data from nested dictionaries in the
/// value list of the dictionary
arrow::Status Finish(std::shared_ptr<Array> key_tuple_data,
std::shared_ptr<Array> key_dict_data,
std::shared_ptr<Array> val_list_data,
std::shared_ptr<Array> val_tuple_data,
std::shared_ptr<Array> val_dict_data, std::shared_ptr<Array>* out);

private:
SequenceBuilder keys_;
SequenceBuilder vals_;
};

arrow::Status SerializeSequences(std::vector<PyObject*> sequences,
int32_t recursion_depth,
std::shared_ptr<arrow::Array>* out,
std::vector<PyObject*>* tensors_out);

arrow::Status SerializeDict(std::vector<PyObject*> dicts, int32_t recursion_depth,
std::shared_ptr<arrow::Array>* out,
std::vector<PyObject*>* tensors_out);
Status WriteSerializedPythonSequence(std::shared_ptr<RecordBatch> batch,
std::vector<std::shared_ptr<Tensor>> tensors,
io::OutputStream* dst);

arrow::Status SerializeArray(PyArrayObject* array, SequenceBuilder* builder,
std::vector<PyObject*>* subdicts,
std::vector<PyObject*>* tensors_out);
Status SerializeSequences(std::vector<PyObject*> sequences,
int32_t recursion_depth,
std::shared_ptr<Array>* out,
std::vector<PyObject*>* tensors_out);

std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data);

Expand Down
44 changes: 12 additions & 32 deletions python/pyarrow/serialization.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ cdef extern from "arrow/python/python_to_arrow.h" namespace 'arrow::py':

cdef shared_ptr[CRecordBatch] MakeBatch(shared_ptr[CArray] data)

cdef extern from "arrow/python/api.h" namespace 'arrow::py' nogil:

cdef CStatus WriteSerializedPythonSequence(shared_ptr[CRecordBatch] batch,
c_vector[shared_ptr[CTensor]] tensors,
OutputStream* dst)

cdef CStatus ReadSerializedPythonSequence(shared_ptr[RandomAccessFile] src,
shared_ptr[CRecordBatch]* batch_out,
c_vector[shared_ptr[CTensor]]* tensors_out)

cdef extern from "arrow/python/python_to_arrow.h":

cdef extern PyObject *pyarrow_serialize_callback
Expand Down Expand Up @@ -180,46 +190,16 @@ def deserialize_sequence(PythonObject value, object base):
def write_python_object(PythonObject value, int32_t num_tensors, NativeFile sink):
cdef shared_ptr[OutputStream] stream
sink.write_handle(&stream)
cdef shared_ptr[CRecordBatchStreamWriter] writer
cdef shared_ptr[CSchema] schema = deref(value.batch).schema()
cdef shared_ptr[CRecordBatch] batch = value.batch
cdef shared_ptr[CTensor] tensor
cdef int32_t metadata_length
cdef int64_t body_length

with nogil:
# write number of tensors
check_status(stream.get().Write(<uint8_t*> &num_tensors, sizeof(int32_t)))

check_status(CRecordBatchStreamWriter.Open(stream.get(), schema, &writer))
check_status(deref(writer).WriteRecordBatch(deref(batch)))
check_status(deref(writer).Close())

for tensor in value.tensors:
check_status(WriteTensor(deref(tensor), stream.get(), &metadata_length, &body_length))
check_status(WriteSerializedPythonSequence(value.batch, value.tensors, stream.get()))

def read_python_object(NativeFile source):
cdef PythonObject result = PythonObject()
cdef shared_ptr[RandomAccessFile] stream
source.read_handle(&stream)
cdef shared_ptr[CRecordBatchStreamReader] reader
cdef shared_ptr[CTensor] tensor
cdef int64_t offset
cdef int64_t bytes_read
cdef int32_t num_tensors

with nogil:
# read number of tensors
check_status(stream.get().Read(sizeof(int32_t), &bytes_read, <uint8_t*> &num_tensors))

check_status(CRecordBatchStreamReader.Open(<shared_ptr[InputStream]> stream, &reader))
check_status(reader.get().ReadNextRecordBatch(&result.batch))

check_status(deref(stream).Tell(&offset))

for i in range(num_tensors):
check_status(ReadTensor(offset, stream.get(), &tensor))
result.tensors.push_back(tensor)
check_status(deref(stream).Tell(&offset))
check_status(ReadSerializedPythonSequence(stream, &result.batch, &result.tensors))

return result

0 comments on commit e1fc0c5

Please sign in to comment.