diff --git a/cpp/src/arrow/python/CMakeLists.txt b/cpp/src/arrow/python/CMakeLists.txt index 0fdf81e7aa9d7..5ea2c75a5973d 100644 --- a/cpp/src/arrow/python/CMakeLists.txt +++ b/cpp/src/arrow/python/CMakeLists.txt @@ -46,12 +46,15 @@ set(ARROW_PYTHON_SRCS builtin_convert.cc common.cc config.cc + dict.cc helpers.cc init.cc io.cc numpy_convert.cc pandas_to_arrow.cc + python_to_arrow.cc pyarrow.cc + sequence ) set(ARROW_PYTHON_SHARED_LINK_LIBS @@ -86,14 +89,17 @@ install(FILES builtin_convert.h common.h config.h + dict.h helpers.h init.h io.h numpy_convert.h numpy_interop.h pandas_to_arrow.h + python_to_arrow.h platform.h pyarrow.h + sequence.h type_traits.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/python") diff --git a/cpp/src/arrow/python/dict.cc b/cpp/src/arrow/python/dict.cc new file mode 100644 index 0000000000000..5b605240c2811 --- /dev/null +++ b/cpp/src/arrow/python/dict.cc @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "dict.h" + +namespace arrow { + +Status DictBuilder::Finish(std::shared_ptr key_tuple_data, + std::shared_ptr key_dict_data, std::shared_ptr val_list_data, + std::shared_ptr val_tuple_data, std::shared_ptr val_dict_data, + std::shared_ptr* out) { + // lists and dicts can't be keys of dicts in Python, that is why for + // the keys we do not need to collect sublists + std::shared_ptr keys, vals; + RETURN_NOT_OK(keys_.Finish(nullptr, key_tuple_data, key_dict_data, &keys)); + RETURN_NOT_OK(vals_.Finish(val_list_data, val_tuple_data, val_dict_data, &vals)); + auto keys_field = std::make_shared("keys", keys->type()); + auto vals_field = std::make_shared("vals", vals->type()); + auto type = + std::make_shared(std::vector({keys_field, vals_field})); + std::vector> field_arrays({keys, vals}); + DCHECK(keys->length() == vals->length()); + out->reset(new StructArray(type, keys->length(), field_arrays)); + return Status::OK(); +} + +} // namespace arrow diff --git a/cpp/src/arrow/python/dict.h b/cpp/src/arrow/python/dict.h new file mode 100644 index 0000000000000..aeb0e4ac1f0da --- /dev/null +++ b/cpp/src/arrow/python/dict.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PYTHON_ARROW_DICT_H +#define PYTHON_ARROW_DICT_H + +#include + +#include "sequence.h" + +namespace arrow { + +/// Constructing dictionaries of key/value pairs. Sequences of +/// keys and values are built separately using a pair of +/// SequenceBuilders. The resulting Arrow representation +/// can be obtained via the Finish method. +class DictBuilder { + public: + DictBuilder(arrow::MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {} + + /// Builder for the keys of the dictionary + SequenceBuilder& keys() { return keys_; } + /// Builder for the values of the dictionary + SequenceBuilder& vals() { return vals_; } + + /// Construct an Arrow StructArray representing the dictionary. + /// Contains a field "keys" for the keys and "vals" for the values. + + /// \param list_data + /// List containing the data from nested lists in the value + /// list of the dictionary + /// + /// \param dict_data + /// List containing the data from nested dictionaries in the + /// value list of the dictionary + arrow::Status Finish(std::shared_ptr key_tuple_data, + std::shared_ptr key_dict_data, + std::shared_ptr val_list_data, + std::shared_ptr val_tuple_data, + std::shared_ptr val_dict_data, std::shared_ptr* out); + + private: + SequenceBuilder keys_; + SequenceBuilder vals_; +}; + +} // namespace arrow + +#endif // PYARROW_DICT_H diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc new file mode 100644 index 0000000000000..d88f85dd0cf9a --- /dev/null +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "python_to_arrow.h" + +#include + +#include "scalars.h" + +constexpr int32_t kMaxRecursionDepth = 100; + +extern "C" { + PyObject* pyarrow_serialize_callback = NULL; + PyObject* pyarrow_deserialize_callback = NULL; +} + +namespace arrow { + +Status append(PyObject* elem, SequenceBuilder& builder, std::vector& sublists, + std::vector& subtuples, std::vector& subdicts, + std::vector& tensors_out) { + // The bool case must precede the int case (PyInt_Check passes for bools) + if (PyBool_Check(elem)) { + RETURN_NOT_OK(builder.AppendBool(elem == Py_True)); + } else if (PyFloat_Check(elem)) { + RETURN_NOT_OK(builder.AppendDouble(PyFloat_AS_DOUBLE(elem))); + } else if (PyLong_Check(elem)) { + int overflow = 0; + int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow); + RETURN_NOT_OK(builder.AppendInt64(data)); + if (overflow) { return Status::NotImplemented("long overflow"); } +#if PY_MAJOR_VERSION < 3 + } else if (PyInt_Check(elem)) { + RETURN_NOT_OK(builder.AppendInt64(static_cast(PyInt_AS_LONG(elem)))); +#endif + } else if (PyBytes_Check(elem)) { + auto data = reinterpret_cast(PyBytes_AS_STRING(elem)); + auto size = PyBytes_GET_SIZE(elem); + RETURN_NOT_OK(builder.AppendBytes(data, size)); + } else if (PyUnicode_Check(elem)) { + Py_ssize_t size; +#if PY_MAJOR_VERSION >= 3 + char* data = PyUnicode_AsUTF8AndSize(elem, &size); + Status s = builder.AppendString(data, size); +#else + PyObject* str = PyUnicode_AsUTF8String(elem); + char* data = PyString_AS_STRING(str); + size = PyString_GET_SIZE(str); + Status s = builder.AppendString(data, size); + Py_XDECREF(str); +#endif + RETURN_NOT_OK(s); + } else if (PyList_Check(elem)) { + builder.AppendList(PyList_Size(elem)); + sublists.push_back(elem); + } else if (PyDict_Check(elem)) { + builder.AppendDict(PyDict_Size(elem)); + subdicts.push_back(elem); + } else if (PyTuple_CheckExact(elem)) { + builder.AppendTuple(PyTuple_Size(elem)); + subtuples.push_back(elem); + } else if (PyArray_IsScalar(elem, Generic)) { + RETURN_NOT_OK(AppendScalar(elem, builder)); + } else if (PyArray_Check(elem)) { + RETURN_NOT_OK(SerializeArray((PyArrayObject*)elem, builder, subdicts, tensors_out)); + } else if (elem == Py_None) { + RETURN_NOT_OK(builder.AppendNone()); + } else { + if (!pyarrow_serialize_callback) { + std::stringstream ss; + ss << "data type of " << PyBytes_AS_STRING(PyObject_Repr(elem)) + << " not recognized and custom serialization handler not registered"; + return Status::NotImplemented(ss.str()); + } else { + PyObject* arglist = Py_BuildValue("(O)", elem); + // The reference count of the result of the call to PyObject_CallObject + // must be decremented. This is done in SerializeDict in this file. + PyObject* result = PyObject_CallObject(pyarrow_serialize_callback, arglist); + Py_XDECREF(arglist); + if (!result) { return Status::NotImplemented("python error"); } + builder.AppendDict(PyDict_Size(result)); + subdicts.push_back(result); + } + } + return Status::OK(); +} + +Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder, + std::vector& subdicts, std::vector& tensors_out) { + int dtype = PyArray_TYPE(array); + switch (dtype) { + case NPY_BOOL: + case NPY_UINT8: + case NPY_INT8: + case NPY_UINT16: + case NPY_INT16: + case NPY_UINT32: + case NPY_INT32: + case NPY_UINT64: + case NPY_INT64: + case NPY_FLOAT: + case NPY_DOUBLE: { + RETURN_NOT_OK(builder.AppendTensor(tensors_out.size())); + tensors_out.push_back(reinterpret_cast(array)); + } break; + default: + if (!pyarrow_serialize_callback) { + std::stringstream stream; + stream << "numpy data type not recognized: " << dtype; + return Status::NotImplemented(stream.str()); + } else { + PyObject* arglist = Py_BuildValue("(O)", array); + // The reference count of the result of the call to PyObject_CallObject + // must be decremented. This is done in SerializeDict in python.cc. + PyObject* result = PyObject_CallObject(pyarrow_serialize_callback, arglist); + Py_XDECREF(arglist); + if (!result) { return Status::NotImplemented("python error"); } + builder.AppendDict(PyDict_Size(result)); + subdicts.push_back(result); + } + } + return Status::OK(); +} + +Status SerializeSequences(std::vector sequences, int32_t recursion_depth, + std::shared_ptr* out, std::vector& tensors_out) { + DCHECK(out); + if (recursion_depth >= kMaxRecursionDepth) { + return Status::NotImplemented( + "This object exceeds the maximum recursion depth. It may contain itself " + "recursively."); + } + SequenceBuilder builder(nullptr); + std::vector sublists, subtuples, subdicts; + for (const auto& sequence : sequences) { + PyObject* item; + PyObject* iterator = PyObject_GetIter(sequence); + while ((item = PyIter_Next(iterator))) { + Status s = append(item, builder, sublists, subtuples, subdicts, tensors_out); + Py_DECREF(item); + // if an error occurs, we need to decrement the reference counts before returning + if (!s.ok()) { + Py_DECREF(iterator); + return s; + } + } + Py_DECREF(iterator); + } + std::shared_ptr list; + if (sublists.size() > 0) { + RETURN_NOT_OK(SerializeSequences(sublists, recursion_depth + 1, &list, tensors_out)); + } + std::shared_ptr tuple; + if (subtuples.size() > 0) { + RETURN_NOT_OK( + SerializeSequences(subtuples, recursion_depth + 1, &tuple, tensors_out)); + } + std::shared_ptr dict; + if (subdicts.size() > 0) { + RETURN_NOT_OK(SerializeDict(subdicts, recursion_depth + 1, &dict, tensors_out)); + } + return builder.Finish(list, tuple, dict, out); +} + +Status SerializeDict(std::vector dicts, int32_t recursion_depth, + std::shared_ptr* out, std::vector& tensors_out) { + DictBuilder result; + if (recursion_depth >= kMaxRecursionDepth) { + return Status::NotImplemented( + "This object exceeds the maximum recursion depth. It may contain itself " + "recursively."); + } + std::vector key_tuples, key_dicts, val_lists, val_tuples, val_dicts, dummy; + for (const auto& dict : dicts) { + PyObject *key, *value; + Py_ssize_t pos = 0; + while (PyDict_Next(dict, &pos, &key, &value)) { + RETURN_NOT_OK( + append(key, result.keys(), dummy, key_tuples, key_dicts, tensors_out)); + DCHECK(dummy.size() == 0); + RETURN_NOT_OK( + append(value, result.vals(), val_lists, val_tuples, val_dicts, tensors_out)); + } + } + std::shared_ptr key_tuples_arr; + if (key_tuples.size() > 0) { + RETURN_NOT_OK(SerializeSequences( + key_tuples, recursion_depth + 1, &key_tuples_arr, tensors_out)); + } + std::shared_ptr key_dicts_arr; + if (key_dicts.size() > 0) { + RETURN_NOT_OK( + SerializeDict(key_dicts, recursion_depth + 1, &key_dicts_arr, tensors_out)); + } + std::shared_ptr val_list_arr; + if (val_lists.size() > 0) { + RETURN_NOT_OK( + SerializeSequences(val_lists, recursion_depth + 1, &val_list_arr, tensors_out)); + } + std::shared_ptr val_tuples_arr; + if (val_tuples.size() > 0) { + RETURN_NOT_OK(SerializeSequences( + val_tuples, recursion_depth + 1, &val_tuples_arr, tensors_out)); + } + std::shared_ptr val_dict_arr; + if (val_dicts.size() > 0) { + RETURN_NOT_OK( + SerializeDict(val_dicts, recursion_depth + 1, &val_dict_arr, tensors_out)); + } + result.Finish( + key_tuples_arr, key_dicts_arr, val_list_arr, val_tuples_arr, val_dict_arr, out); + + // This block is used to decrement the reference counts of the results + // returned by the serialization callback, which is called in SerializeArray + // in numpy.cc as well as in DeserializeDict and in append in this file. + static PyObject* py_type = PyUnicode_FromString("_pytype_"); + for (const auto& dict : dicts) { + if (PyDict_Contains(dict, py_type)) { + // If the dictionary contains the key "_pytype_", then the user has to + // have registered a callback. + ARROW_CHECK(pyarrow_serialize_callback); + Py_XDECREF(dict); + } + } + + return Status::OK(); +} + +std::shared_ptr MakeBatch(std::shared_ptr data) { + auto field = std::make_shared("list", data->type()); + std::shared_ptr schema(new Schema({field})); + return std::shared_ptr(new RecordBatch(schema, data->length(), {data})); +} + +} // namespace arrow diff --git a/cpp/src/arrow/python/python_to_arrow.h b/cpp/src/arrow/python/python_to_arrow.h new file mode 100644 index 0000000000000..44232b5f4166a --- /dev/null +++ b/cpp/src/arrow/python/python_to_arrow.h @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_PYTHON_PYTHON_TO_ARROW_H +#define ARROW_PYTHON_PYTHON_TO_ARROW_H + +#include + +#include + +#include "dict.h" +#include "numpy_interop.h" +#include "sequence.h" + +extern "C" { +extern PyObject* pyarrow_serialize_callback; +extern PyObject* pyarrow_deserialize_callback; +} + +namespace arrow { + +arrow::Status SerializeSequences(std::vector sequences, + int32_t recursion_depth, std::shared_ptr* out, + std::vector& tensors_out); + +arrow::Status SerializeDict(std::vector dicts, int32_t recursion_depth, + std::shared_ptr* out, std::vector& tensors_out); + +arrow::Status SerializeArray(PyArrayObject* array, SequenceBuilder& builder, + std::vector& subdicts, std::vector& tensors_out); + +std::shared_ptr MakeBatch(std::shared_ptr data); + +} + +#endif // ARROW_PYTHON_PYTHON_TO_ARROW_H diff --git a/cpp/src/arrow/python/scalars.h b/cpp/src/arrow/python/scalars.h new file mode 100644 index 0000000000000..be4e89220f95d --- /dev/null +++ b/cpp/src/arrow/python/scalars.h @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_PYTHON_SCALARS_H +#define ARROW_PYTHON_SCALARS_H + +#include + +#include +#include "numpy_interop.h" +#include +#include + +#include "sequence.h" + +namespace arrow { + +Status AppendScalar(PyObject* obj, SequenceBuilder& builder) { + if (PyArray_IsScalar(obj, Bool)) { + return builder.AppendBool(((PyBoolScalarObject*)obj)->obval != 0); + } else if (PyArray_IsScalar(obj, Float)) { + return builder.AppendFloat(((PyFloatScalarObject*)obj)->obval); + } else if (PyArray_IsScalar(obj, Double)) { + return builder.AppendDouble(((PyDoubleScalarObject*)obj)->obval); + } + int64_t value = 0; + if (PyArray_IsScalar(obj, Byte)) { + value = ((PyByteScalarObject*)obj)->obval; + } else if (PyArray_IsScalar(obj, UByte)) { + value = ((PyUByteScalarObject*)obj)->obval; + } else if (PyArray_IsScalar(obj, Short)) { + value = ((PyShortScalarObject*)obj)->obval; + } else if (PyArray_IsScalar(obj, UShort)) { + value = ((PyUShortScalarObject*)obj)->obval; + } else if (PyArray_IsScalar(obj, Int)) { + value = ((PyIntScalarObject*)obj)->obval; + } else if (PyArray_IsScalar(obj, UInt)) { + value = ((PyUIntScalarObject*)obj)->obval; + } else if (PyArray_IsScalar(obj, Long)) { + value = ((PyLongScalarObject*)obj)->obval; + } else if (PyArray_IsScalar(obj, ULong)) { + value = ((PyULongScalarObject*)obj)->obval; + } else if (PyArray_IsScalar(obj, LongLong)) { + value = ((PyLongLongScalarObject*)obj)->obval; + } else if (PyArray_IsScalar(obj, ULongLong)) { + value = ((PyULongLongScalarObject*)obj)->obval; + } else { + DCHECK(false) << "scalar type not recognized"; + } + return builder.AppendInt64(value); +} + +} // namespace arrow + +#endif // PYTHON_ARROW_SCALARS_H diff --git a/cpp/src/arrow/python/sequence.cc b/cpp/src/arrow/python/sequence.cc new file mode 100644 index 0000000000000..86ab48949c21e --- /dev/null +++ b/cpp/src/arrow/python/sequence.cc @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "sequence.h" + +namespace arrow { + +SequenceBuilder::SequenceBuilder(MemoryPool* pool) + : pool_(pool), + types_(pool, std::make_shared()), + offsets_(pool, std::make_shared()), + nones_(pool, std::make_shared()), + bools_(pool, std::make_shared()), + ints_(pool, std::make_shared()), + bytes_(pool, std::make_shared()), + strings_(pool), + floats_(pool, std::make_shared()), + doubles_(pool, std::make_shared()), + tensor_indices_(pool, std::make_shared()), + list_offsets_({0}), + tuple_offsets_({0}), + dict_offsets_({0}) {} + +#define UPDATE(OFFSET, TAG) \ + if (TAG == -1) { \ + TAG = num_tags; \ + num_tags += 1; \ + } \ + RETURN_NOT_OK(offsets_.Append(OFFSET)); \ + RETURN_NOT_OK(types_.Append(TAG)); \ + RETURN_NOT_OK(nones_.AppendToBitmap(true)); + +Status SequenceBuilder::AppendNone() { + RETURN_NOT_OK(offsets_.Append(0)); + RETURN_NOT_OK(types_.Append(0)); + return nones_.AppendToBitmap(false); +} + +Status SequenceBuilder::AppendBool(bool data) { + UPDATE(bools_.length(), bool_tag); + return bools_.Append(data); +} + +Status SequenceBuilder::AppendInt64(int64_t data) { + UPDATE(ints_.length(), int_tag); + return ints_.Append(data); +} + +Status SequenceBuilder::AppendUInt64(uint64_t data) { + UPDATE(ints_.length(), int_tag); + return ints_.Append(data); +} + +Status SequenceBuilder::AppendBytes(const uint8_t* data, int32_t length) { + UPDATE(bytes_.length(), bytes_tag); + return bytes_.Append(data, length); +} + +Status SequenceBuilder::AppendString(const char* data, int32_t length) { + UPDATE(strings_.length(), string_tag); + return strings_.Append(data, length); +} + +Status SequenceBuilder::AppendFloat(float data) { + UPDATE(floats_.length(), float_tag); + return floats_.Append(data); +} + +Status SequenceBuilder::AppendDouble(double data) { + UPDATE(doubles_.length(), double_tag); + return doubles_.Append(data); +} + +Status SequenceBuilder::AppendTensor(int32_t tensor_index) { + UPDATE(tensor_indices_.length(), tensor_tag); + return tensor_indices_.Append(tensor_index); +} + +Status SequenceBuilder::AppendList(int32_t size) { + UPDATE(list_offsets_.size() - 1, list_tag); + list_offsets_.push_back(list_offsets_.back() + size); + return Status::OK(); +} + +Status SequenceBuilder::AppendTuple(int32_t size) { + UPDATE(tuple_offsets_.size() - 1, tuple_tag); + tuple_offsets_.push_back(tuple_offsets_.back() + size); + return Status::OK(); +} + +Status SequenceBuilder::AppendDict(int32_t size) { + UPDATE(dict_offsets_.size() - 1, dict_tag); + dict_offsets_.push_back(dict_offsets_.back() + size); + return Status::OK(); +} + +#define ADD_ELEMENT(VARNAME, TAG) \ + if (TAG != -1) { \ + types[TAG] = std::make_shared("", VARNAME.type()); \ + RETURN_NOT_OK(VARNAME.Finish(&children[TAG])); \ + RETURN_NOT_OK(nones_.AppendToBitmap(true)); \ + type_ids.push_back(TAG); \ + } + +#define ADD_SUBSEQUENCE(DATA, OFFSETS, BUILDER, TAG, NAME) \ + if (DATA) { \ + DCHECK(DATA->length() == OFFSETS.back()); \ + std::shared_ptr offset_array; \ + Int32Builder builder(pool_, std::make_shared()); \ + RETURN_NOT_OK(builder.Append(OFFSETS.data(), OFFSETS.size())); \ + RETURN_NOT_OK(builder.Finish(&offset_array)); \ + std::shared_ptr list_array; \ + ListArray::FromArrays(*offset_array, *DATA, pool_, &list_array); \ + auto field = std::make_shared(NAME, list_array->type()); \ + auto type = std::make_shared(std::vector({field})); \ + types[TAG] = std::make_shared("", type); \ + children[TAG] = std::shared_ptr( \ + new StructArray(type, list_array->length(), {list_array})); \ + RETURN_NOT_OK(nones_.AppendToBitmap(true)); \ + type_ids.push_back(TAG); \ + } else { \ + DCHECK(OFFSETS.size() == 1); \ + } + +Status SequenceBuilder::Finish(std::shared_ptr list_data, + std::shared_ptr tuple_data, std::shared_ptr dict_data, + std::shared_ptr* out) { + std::vector> types(num_tags); + std::vector> children(num_tags); + std::vector type_ids; + + ADD_ELEMENT(bools_, bool_tag); + ADD_ELEMENT(ints_, int_tag); + ADD_ELEMENT(strings_, string_tag); + ADD_ELEMENT(bytes_, bytes_tag); + ADD_ELEMENT(floats_, float_tag); + ADD_ELEMENT(doubles_, double_tag); + + ADD_ELEMENT(tensor_indices_, tensor_tag); + + ADD_SUBSEQUENCE(list_data, list_offsets_, list_builder, list_tag, "list"); + ADD_SUBSEQUENCE(tuple_data, tuple_offsets_, tuple_builder, tuple_tag, "tuple"); + ADD_SUBSEQUENCE(dict_data, dict_offsets_, dict_builder, dict_tag, "dict"); + + TypePtr type = TypePtr(new UnionType(types, type_ids, UnionMode::DENSE)); + out->reset(new UnionArray(type, types_.length(), children, types_.data(), + offsets_.data(), nones_.null_bitmap(), nones_.null_count())); + return Status::OK(); +} + +} // namespace arrow diff --git a/cpp/src/arrow/python/sequence.h b/cpp/src/arrow/python/sequence.h new file mode 100644 index 0000000000000..803b4811ad990 --- /dev/null +++ b/cpp/src/arrow/python/sequence.h @@ -0,0 +1,139 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PYTHON_ARROW_SEQUENCE_H +#define PYTHON_ARROW_SEQUENCE_H + +#include +#include + +namespace arrow { + +class NullArrayBuilder : public arrow::ArrayBuilder { + public: + explicit NullArrayBuilder(arrow::MemoryPool* pool, const arrow::TypePtr& type) + : arrow::ArrayBuilder(pool, type) {} + virtual ~NullArrayBuilder(){}; + arrow::Status Finish(std::shared_ptr* out) override { + return arrow::Status::OK(); + } +}; + +/// A Sequence is a heterogeneous collections of elements. It can contain +/// scalar Python types, lists, tuples, dictionaries and tensors. +class SequenceBuilder { + public: + SequenceBuilder(arrow::MemoryPool* pool = nullptr); + + /// Appending a none to the sequence + arrow::Status AppendNone(); + + /// Appending a boolean to the sequence + arrow::Status AppendBool(bool data); + + /// Appending an int64_t to the sequence + arrow::Status AppendInt64(int64_t data); + + /// Appending an uint64_t to the sequence + arrow::Status AppendUInt64(uint64_t data); + + /// Append a list of bytes to the sequence + arrow::Status AppendBytes(const uint8_t* data, int32_t length); + + /// Appending a string to the sequence + arrow::Status AppendString(const char* data, int32_t length); + + /// Appending a float to the sequence + arrow::Status AppendFloat(float data); + + /// Appending a double to the sequence + arrow::Status AppendDouble(double data); + + /// Appending a tensor to the sequence + /// + /// \param tensor_index Index of the tensor in the object. + arrow::Status AppendTensor(int32_t tensor_index); + + /// Add a sublist to the sequence. The data contained in the sublist will be + /// specified in the "Finish" method. + /// + /// To construct l = [[11, 22], 33, [44, 55]] you would for example run + /// list = ListBuilder(); + /// list.AppendList(2); + /// list.Append(33); + /// list.AppendList(2); + /// list.Finish([11, 22, 44, 55]); + /// list.Finish(); + + /// \param size + /// The size of the sublist + arrow::Status AppendList(int32_t size); + + arrow::Status AppendTuple(int32_t size); + + arrow::Status AppendDict(int32_t size); + + /// Finish building the sequence and return the result. + arrow::Status Finish(std::shared_ptr list_data, + std::shared_ptr tuple_data, std::shared_ptr dict_data, + std::shared_ptr* out); + + private: + arrow::MemoryPool* pool_; + + arrow::Int8Builder types_; + arrow::Int32Builder offsets_; + + /// Total number of bytes needed to represent this sequence. + int64_t total_num_bytes_; + + NullArrayBuilder nones_; + arrow::BooleanBuilder bools_; + arrow::Int64Builder ints_; + arrow::BinaryBuilder bytes_; + arrow::StringBuilder strings_; + arrow::FloatBuilder floats_; + arrow::DoubleBuilder doubles_; + + // We use an Int32Builder here to distinguish the tensor indices from + // the ints_ above (see the case Type::INT32 in get_value in python.cc). + // TODO(pcm): Replace this by using the union tags to distinguish between + // these two cases. + arrow::Int32Builder tensor_indices_; + + std::vector list_offsets_; + std::vector tuple_offsets_; + std::vector dict_offsets_; + + int8_t bool_tag = -1; + int8_t int_tag = -1; + int8_t string_tag = -1; + int8_t bytes_tag = -1; + int8_t float_tag = -1; + int8_t double_tag = -1; + + int8_t tensor_tag = -1; + int8_t list_tag = -1; + int8_t tuple_tag = -1; + int8_t dict_tag = -1; + + int8_t num_tags = 0; +}; + +} // namespace arrow + +#endif // PYTHON_ARROW_SEQUENCE_H diff --git a/python/pyarrow/lib.pyx b/python/pyarrow/lib.pyx index 789801b9f06a9..4ea327ef9264c 100644 --- a/python/pyarrow/lib.pyx +++ b/python/pyarrow/lib.pyx @@ -119,5 +119,8 @@ include "ipc.pxi" # Feather format include "feather.pxi" +# Python serialization +include "serialization.pxi" + # Public API include "public-api.pxi" diff --git a/python/pyarrow/serialization.pxi b/python/pyarrow/serialization.pxi new file mode 100644 index 0000000000000..7513e982b016e --- /dev/null +++ b/python/pyarrow/serialization.pxi @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from libcpp cimport bool as c_bool, nullptr +from libcpp.vector cimport vector as c_vector +from cpython.ref cimport PyObject + +from pyarrow.lib cimport Buffer, NativeFile, check_status + +cdef extern from "arrow/python/python_to_arrow.h" nogil: + + cdef CStatus SerializeSequences(c_vector[PyObject*] sequences, + int32_t recursion_depth, shared_ptr[CArray]* array_out, + c_vector[PyObject*]& tensors_out) + + cdef shared_ptr[CRecordBatch] MakeBatch(shared_ptr[CArray] data) + +cdef class PythonObject: + + cdef: + shared_ptr[CRecordBatch] batch + c_vector[shared_ptr[CTensor]] tensors + + def __cinit__(self): + pass + +def serialize_list(object value): + cdef int32_t recursion_depth = 0 + cdef PythonObject result = PythonObject() + cdef c_vector[PyObject*] sequences + cdef shared_ptr[CArray] array + cdef c_vector[PyObject*] tensors + cdef PyObject* tensor + cdef shared_ptr[CTensor] out + sequences.push_back( value) + check_status(SerializeSequences(sequences, recursion_depth, &array, tensors)) + result.batch = MakeBatch(array) + for tensor in tensors: + check_status(NdarrayToTensor(c_default_memory_pool(), tensor, &out)) + result.tensors.push_back(out) + return result