Skip to content

Commit

Permalink
ARROW-759: [Python] Serializing large class of Python objects in Apac…
Browse files Browse the repository at this point in the history
…he Arrow

This PR adds the capability to serialize a large class of (nested) Python objects in Apache Arrow. The eventual goal is to evolve this into a more modern version of pickle that will make it possible to read the data from other languages supported by Apache Arrow (and might also be faster).

Currently we support lists, tuples, dicts, strings, numpy objects, Python classes and namedtuples. A fallback to (cloud-)pickle can be provided for objects that cannot be natively represented in Arrow (for example lambdas).

Numpy data within objects is efficiently represented using Arrow's Tensor facilities and for the nested Python sequences we use Arrow's UnionArray.

There are many loose ends that will need to be addressed in follow up PRs.

Author: Philipp Moritz <pcmoritz@gmail.com>
Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #965 from pcmoritz/python-serialization and squashes the following commits:

31486ed [Wes McKinney] Fix typo
2164db7 [Wes McKinney] Add SerializedPyObject to public API
b70235c [Wes McKinney] Add pyarrow.deserialize convenience method
a6a402e [Wes McKinney] Memory map fixture robustness on Windows
114a5fb [Wes McKinney] Add a Python container for the SerializedPyObject data, total_bytes method
8e59617 [Wes McKinney] Use pytest tmpdir for large memory map fixture so works on Windows
8a42f30 [Wes McKinney] Add doxygen comment to set_serialization_callbacks
a9522c5 [Wes McKinney] Refactoring, address code review comments. fix flake8 issues
ce5784d [Wes McKinney] Do not use ARROW_CHECK in production code. Consolidate python_to_arrow code
c8efef9 [Wes McKinney] Fix various Clang compiler warnings due to integer conversions. clang-format
831e2f2 [Philipp Moritz] remove sequence.h
54af39b [Philipp Moritz] more fixes
a6fdb76 [Philipp Moritz] make tests work
fe56c73 [Philipp Moritz] fixes
84d62f6 [Philipp Moritz] more fixes
49aba8a [Philipp Moritz] make it compile on windows
aa1f300 [Philipp Moritz] linting
95cb9da [Philipp Moritz] fix GIL
adcc8f7 [Philipp Moritz] shuffle stuff around
bcebdfe [Philipp Moritz] fix longlong vs int64 and unsigned variant
4cc45cd [Philipp Moritz] cleanup
f25f3f3 [Philipp Moritz] cleanups
a88d410 [Philipp Moritz] convert DESERIALIZE_SEQUENCE back to a macro
c425978 [Philipp Moritz] prevent possible memory leaks
aeafd82 [Philipp Moritz] fix callbacks
389bfc6 [Philipp Moritz] documentation
2f0760c [Philipp Moritz] fix api
faf9a3e [Philipp Moritz] make exported API more consistent
e1fc0c5 [Philipp Moritz] restructure
c1f377b [Philipp Moritz] more fixes
3e94e6d [Philipp Moritz] clang-format
99e2d1a [Philipp Moritz] cleanups
3298329 [Philipp Moritz] mutable refs and small fixes
e73c1ea [Philipp Moritz] make DictBuilder private
3929273 [Philipp Moritz] increase Py_True refcount and hide helper methods
aaf6f09 [Philipp Moritz] remove code duplication
c38c58d [Philipp Moritz] get rid of leaks and clarify reference counting for dicts
74b9e46 [Philipp Moritz] convert DESERIALIZE_SEQUENCE to a template
080db03 [Philipp Moritz] fix first few comments
a6105d2 [Philipp Moritz] lint fix
802e739 [Philipp Moritz] clang-format
2e08de4 [Philipp Moritz] fix namespaces
91b57d5 [Philipp Moritz] fix linting
c4782ac [Philipp Moritz] fix
7069e20 [Philipp Moritz] fix imports
2171761 [Philipp Moritz] fix python unicode string
30bb960 [Philipp Moritz] rebase
f229d8d [Philipp Moritz] serialization of custom objects
8b2ffe6 [Philipp Moritz] working version
bd36c83 [Philipp Moritz] handle very long longs with custom serialization callback
49a4acb [Philipp Moritz] roundtrip working for the first time
44fb98b [Philipp Moritz] work in progress
3af1c67 [Philipp Moritz] deserialization path (need to figure out if base object and refcounting is handled correctly)
deb3b46 [Philipp Moritz] rename serialization entry point
5766b8c [Philipp Moritz] python to arrow serialization
  • Loading branch information
pcmoritz authored and wesm committed Aug 20, 2017
1 parent 10f7158 commit b50f235
Show file tree
Hide file tree
Showing 16 changed files with 1,619 additions and 9 deletions.
12 changes: 6 additions & 6 deletions cpp/src/arrow/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder<T> {
using PrimitiveBuilder<T>::Reserve;

/// Append a single scalar and increase the size if necessary.
Status Append(value_type val) {
Status Append(const value_type val) {
RETURN_NOT_OK(ArrayBuilder::Reserve(1));
UnsafeAppend(val);
return Status::OK();
Expand All @@ -255,7 +255,7 @@ class ARROW_EXPORT NumericBuilder : public PrimitiveBuilder<T> {
///
/// This method does not capacity-check; make sure to call Reserve
/// beforehand.
void UnsafeAppend(value_type val) {
void UnsafeAppend(const value_type val) {
BitUtil::SetBit(null_bitmap_data_, length_);
raw_data_[length_++] = val;
}
Expand Down Expand Up @@ -371,7 +371,7 @@ class ARROW_EXPORT AdaptiveUIntBuilder : public internal::AdaptiveIntBuilderBase
using ArrayBuilder::Advance;

/// Scalar append
Status Append(uint64_t val) {
Status Append(const uint64_t val) {
RETURN_NOT_OK(Reserve(1));
BitUtil::SetBit(null_bitmap_data_, length_);

Expand Down Expand Up @@ -430,7 +430,7 @@ class ARROW_EXPORT AdaptiveIntBuilder : public internal::AdaptiveIntBuilderBase
using ArrayBuilder::Advance;

/// Scalar append
Status Append(int64_t val) {
Status Append(const int64_t val) {
RETURN_NOT_OK(Reserve(1));
BitUtil::SetBit(null_bitmap_data_, length_);

Expand Down Expand Up @@ -511,7 +511,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
std::shared_ptr<Buffer> data() const { return data_; }

/// Scalar append
Status Append(bool val) {
Status Append(const bool val) {
RETURN_NOT_OK(Reserve(1));
BitUtil::SetBit(null_bitmap_data_, length_);
if (val) {
Expand All @@ -523,7 +523,7 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
return Status::OK();
}

Status Append(uint8_t val) { return Append(val != 0); }
Status Append(const uint8_t val) { return Append(val != 0); }

/// Vector append
///
Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/python/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ set(ARROW_PYTHON_TEST_LINK_LIBS ${ARROW_PYTHON_MIN_TEST_LIBS})

set(ARROW_PYTHON_SRCS
arrow_to_pandas.cc
arrow_to_python.cc
builtin_convert.cc
common.cc
config.cc
Expand All @@ -51,6 +52,7 @@ set(ARROW_PYTHON_SRCS
io.cc
numpy_convert.cc
pandas_to_arrow.cc
python_to_arrow.cc
pyarrow.cc
)

Expand Down Expand Up @@ -83,6 +85,7 @@ endif()
install(FILES
api.h
arrow_to_pandas.h
arrow_to_python.h
builtin_convert.h
common.h
config.h
Expand All @@ -92,6 +95,7 @@ install(FILES
numpy_convert.h
numpy_interop.h
pandas_to_arrow.h
python_to_arrow.h
platform.h
pyarrow.h
type_traits.h
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/python/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
#define ARROW_PYTHON_API_H

#include "arrow/python/arrow_to_pandas.h"
#include "arrow/python/arrow_to_python.h"
#include "arrow/python/builtin_convert.h"
#include "arrow/python/common.h"
#include "arrow/python/helpers.h"
#include "arrow/python/io.h"
#include "arrow/python/numpy_convert.h"
#include "arrow/python/pandas_to_arrow.h"
#include "arrow/python/python_to_arrow.h"

#endif // ARROW_PYTHON_API_H
221 changes: 221 additions & 0 deletions cpp/src/arrow/python/arrow_to_python.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/python/arrow_to_python.h"

#include <cstdint>
#include <memory>
#include <vector>

#include "arrow/array.h"
#include "arrow/io/interfaces.h"
#include "arrow/ipc/reader.h"
#include "arrow/python/common.h"
#include "arrow/python/helpers.h"
#include "arrow/python/numpy_convert.h"
#include "arrow/table.h"
#include "arrow/util/logging.h"

extern "C" {
extern PyObject* pyarrow_serialize_callback;
extern PyObject* pyarrow_deserialize_callback;
}

namespace arrow {
namespace py {

Status CallCustomCallback(PyObject* callback, PyObject* elem, PyObject** result);

Status DeserializeTuple(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out);

Status DeserializeList(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out);

Status DeserializeDict(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out) {
auto data = std::dynamic_pointer_cast<StructArray>(array);
ScopedRef keys, vals;
ScopedRef result(PyDict_New());
RETURN_NOT_OK(
DeserializeList(data->field(0), start_idx, stop_idx, base, tensors, keys.ref()));
RETURN_NOT_OK(
DeserializeList(data->field(1), start_idx, stop_idx, base, tensors, vals.ref()));
for (int64_t i = start_idx; i < stop_idx; ++i) {
// PyDict_SetItem behaves differently from PyList_SetItem and PyTuple_SetItem.
// The latter two steal references whereas PyDict_SetItem does not. So we need
// to make sure the reference count is decremented by letting the ScopedRef
// go out of scope at the end.
PyDict_SetItem(result.get(), PyList_GET_ITEM(keys.get(), i - start_idx),
PyList_GET_ITEM(vals.get(), i - start_idx));
}
static PyObject* py_type = PyUnicode_FromString("_pytype_");
if (PyDict_Contains(result.get(), py_type)) {
RETURN_NOT_OK(CallCustomCallback(pyarrow_deserialize_callback, result.get(), out));
} else {
*out = result.release();
}
return Status::OK();
}

Status DeserializeArray(std::shared_ptr<Array> array, int64_t offset, PyObject* base,
const std::vector<std::shared_ptr<arrow::Tensor>>& tensors,
PyObject** out) {
DCHECK(array);
int32_t index = std::static_pointer_cast<Int32Array>(array)->Value(offset);
RETURN_NOT_OK(py::TensorToNdarray(*tensors[index], base, out));
// Mark the array as immutable
ScopedRef flags(PyObject_GetAttrString(*out, "flags"));
DCHECK(flags.get() != NULL) << "Could not mark Numpy array immutable";
Py_INCREF(Py_False);
int flag_set = PyObject_SetAttrString(flags.get(), "writeable", Py_False);
DCHECK(flag_set == 0) << "Could not mark Numpy array immutable";
return Status::OK();
}

Status GetValue(std::shared_ptr<Array> arr, int64_t index, int32_t type, PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors, PyObject** result) {
switch (arr->type()->id()) {
case Type::BOOL:
*result =
PyBool_FromLong(std::static_pointer_cast<BooleanArray>(arr)->Value(index));
return Status::OK();
case Type::INT64:
*result =
PyLong_FromSsize_t(std::static_pointer_cast<Int64Array>(arr)->Value(index));
return Status::OK();
case Type::BINARY: {
int32_t nchars;
const uint8_t* str =
std::static_pointer_cast<BinaryArray>(arr)->GetValue(index, &nchars);
*result = PyBytes_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
return CheckPyError();
}
case Type::STRING: {
int32_t nchars;
const uint8_t* str =
std::static_pointer_cast<StringArray>(arr)->GetValue(index, &nchars);
*result = PyUnicode_FromStringAndSize(reinterpret_cast<const char*>(str), nchars);
return CheckPyError();
}
case Type::FLOAT:
*result =
PyFloat_FromDouble(std::static_pointer_cast<FloatArray>(arr)->Value(index));
return Status::OK();
case Type::DOUBLE:
*result =
PyFloat_FromDouble(std::static_pointer_cast<DoubleArray>(arr)->Value(index));
return Status::OK();
case Type::STRUCT: {
auto s = std::static_pointer_cast<StructArray>(arr);
auto l = std::static_pointer_cast<ListArray>(s->field(0));
if (s->type()->child(0)->name() == "list") {
return DeserializeList(l->values(), l->value_offset(index),
l->value_offset(index + 1), base, tensors, result);
} else if (s->type()->child(0)->name() == "tuple") {
return DeserializeTuple(l->values(), l->value_offset(index),
l->value_offset(index + 1), base, tensors, result);
} else if (s->type()->child(0)->name() == "dict") {
return DeserializeDict(l->values(), l->value_offset(index),
l->value_offset(index + 1), base, tensors, result);
} else {
DCHECK(false) << "unexpected StructArray type " << s->type()->child(0)->name();
}
}
// We use an Int32Builder here to distinguish the tensor indices from
// the Type::INT64 above (see tensor_indices_ in SequenceBuilder).
case Type::INT32: {
return DeserializeArray(arr, index, base, tensors, result);
}
default:
DCHECK(false) << "union tag " << type << " not recognized";
}
return Status::OK();
}

#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN) \
auto data = std::dynamic_pointer_cast<UnionArray>(array); \
int64_t size = array->length(); \
ScopedRef result(CREATE_FN(stop_idx - start_idx)); \
auto types = std::make_shared<Int8Array>(size, data->type_ids()); \
auto offsets = std::make_shared<Int32Array>(size, data->value_offsets()); \
for (int64_t i = start_idx; i < stop_idx; ++i) { \
if (data->IsNull(i)) { \
Py_INCREF(Py_None); \
SET_ITEM_FN(result.get(), i - start_idx, Py_None); \
} else { \
int64_t offset = offsets->Value(i); \
int8_t type = types->Value(i); \
std::shared_ptr<Array> arr = data->child(type); \
PyObject* value; \
RETURN_NOT_OK(GetValue(arr, offset, type, base, tensors, &value)); \
SET_ITEM_FN(result.get(), i - start_idx, value); \
} \
} \
*out = result.release(); \
return Status::OK();

Status DeserializeList(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out) {
DESERIALIZE_SEQUENCE(PyList_New, PyList_SET_ITEM)
}

Status DeserializeTuple(std::shared_ptr<Array> array, int64_t start_idx, int64_t stop_idx,
PyObject* base,
const std::vector<std::shared_ptr<Tensor>>& tensors,
PyObject** out) {
DESERIALIZE_SEQUENCE(PyTuple_New, PyTuple_SET_ITEM)
}

Status ReadSerializedObject(std::shared_ptr<io::RandomAccessFile> src,
SerializedPyObject* out) {
std::shared_ptr<ipc::RecordBatchStreamReader> reader;
int64_t offset;
int64_t bytes_read;
int32_t num_tensors;
// Read number of tensors
RETURN_NOT_OK(
src->Read(sizeof(int32_t), &bytes_read, reinterpret_cast<uint8_t*>(&num_tensors)));
RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(src, &reader));
RETURN_NOT_OK(reader->ReadNextRecordBatch(&out->batch));
RETURN_NOT_OK(src->Tell(&offset));
offset += 4; // Skip the end-of-stream message
for (int i = 0; i < num_tensors; ++i) {
std::shared_ptr<Tensor> tensor;
RETURN_NOT_OK(ipc::ReadTensor(offset, src.get(), &tensor));
out->tensors.push_back(tensor);
RETURN_NOT_OK(src->Tell(&offset));
}
return Status::OK();
}

Status DeserializeObject(const SerializedPyObject& obj, PyObject* base, PyObject** out) {
PyAcquireGIL lock;
return DeserializeList(obj.batch->column(0), 0, obj.batch->num_rows(), base,
obj.tensors, out);
}

} // namespace py
} // namespace arrow
66 changes: 66 additions & 0 deletions cpp/src/arrow/python/arrow_to_python.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef ARROW_PYTHON_ARROW_TO_PYTHON_H
#define ARROW_PYTHON_ARROW_TO_PYTHON_H

#include "arrow/python/platform.h"

#include <cstdint>
#include <memory>
#include <vector>

#include "arrow/python/python_to_arrow.h"
#include "arrow/status.h"
#include "arrow/util/visibility.h"

namespace arrow {

class RecordBatch;
class Tensor;

namespace io {

class RandomAccessFile;

} // namespace io

namespace py {

/// \brief Read serialized Python sequence from file interface using Arrow IPC
/// \param[in] src a RandomAccessFile
/// \param[out] out the reconstructed data
/// \return Status
ARROW_EXPORT
Status ReadSerializedObject(std::shared_ptr<io::RandomAccessFile> src,
SerializedPyObject* out);

/// \brief Reconstruct Python object from Arrow-serialized representation
/// \param[in] object
/// \param[in] base a Python object holding the underlying data that any NumPy
/// arrays will reference, to avoid premature deallocation
/// \param[out] out the returned object
/// \return Status
/// This acquires the GIL
ARROW_EXPORT
Status DeserializeObject(const SerializedPyObject& object, PyObject* base,
PyObject** out);

} // namespace py
} // namespace arrow

#endif // ARROW_PYTHON_ARROW_TO_PYTHON_H
Loading

0 comments on commit b50f235

Please sign in to comment.