Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ cpp/.idea/
python/.eggs/
.vscode
.idea/
.pytest_cache/
6 changes: 3 additions & 3 deletions cpp/src/arrow/python/arrow_to_pandas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,13 @@ class PandasBlock {
int64_t num_rows_;
int num_columns_;

OwnedRef block_arr_;
OwnedRefNoGIL block_arr_;
uint8_t* block_data_;

PandasOptions options_;

// ndarray<int32>
OwnedRef placement_arr_;
OwnedRefNoGIL placement_arr_;
int64_t* placement_data_;

private:
Expand Down Expand Up @@ -1140,7 +1140,7 @@ class CategoricalBlock : public PandasBlock {
}

MemoryPool* pool_;
OwnedRef dictionary_;
OwnedRefNoGIL dictionary_;
bool ordered_;
bool needs_copy_;
};
Expand Down
38 changes: 19 additions & 19 deletions cpp/src/arrow/python/arrow_to_python.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ Status DeserializeDict(PyObject* context, const Array& array, int64_t start_idx,
int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
PyObject** out) {
const auto& data = static_cast<const StructArray&>(array);
ScopedRef keys, vals;
ScopedRef result(PyDict_New());
OwnedRef keys, vals;
OwnedRef result(PyDict_New());
RETURN_IF_PYERROR();

DCHECK_EQ(2, data.num_fields());
Expand All @@ -77,16 +77,16 @@ Status DeserializeDict(PyObject* context, const Array& array, int64_t start_idx,
for (int64_t i = start_idx; i < stop_idx; ++i) {
// PyDict_SetItem behaves differently from PyList_SetItem and PyTuple_SetItem.
// The latter two steal references whereas PyDict_SetItem does not. So we need
// to make sure the reference count is decremented by letting the ScopedRef
// to make sure the reference count is decremented by letting the OwnedRef
// go out of scope at the end.
PyDict_SetItem(result.get(), PyList_GET_ITEM(keys.get(), i - start_idx),
PyList_GET_ITEM(vals.get(), i - start_idx));
PyDict_SetItem(result.obj(), PyList_GET_ITEM(keys.obj(), i - start_idx),
PyList_GET_ITEM(vals.obj(), i - start_idx));
}
static PyObject* py_type = PyUnicode_FromString("_pytype_");
if (PyDict_Contains(result.get(), py_type)) {
RETURN_NOT_OK(CallDeserializeCallback(context, result.get(), out));
if (PyDict_Contains(result.obj(), py_type)) {
RETURN_NOT_OK(CallDeserializeCallback(context, result.obj(), out));
} else {
*out = result.release();
*out = result.detach();
}
return Status::OK();
}
Expand All @@ -96,10 +96,10 @@ Status DeserializeArray(const Array& array, int64_t offset, PyObject* base,
int32_t index = static_cast<const Int32Array&>(array).Value(offset);
RETURN_NOT_OK(py::TensorToNdarray(*blobs.tensors[index], base, out));
// Mark the array as immutable
ScopedRef flags(PyObject_GetAttrString(*out, "flags"));
DCHECK(flags.get() != NULL) << "Could not mark Numpy array immutable";
OwnedRef flags(PyObject_GetAttrString(*out, "flags"));
DCHECK(flags.obj() != NULL) << "Could not mark Numpy array immutable";
Py_INCREF(Py_False);
int flag_set = PyObject_SetAttrString(flags.get(), "writeable", Py_False);
int flag_set = PyObject_SetAttrString(flags.obj(), "writeable", Py_False);
DCHECK(flag_set == 0) << "Could not mark Numpy array immutable";
return Status::OK();
}
Expand Down Expand Up @@ -184,23 +184,23 @@ Status GetValue(PyObject* context, const UnionArray& parent, const Array& arr,

#define DESERIALIZE_SEQUENCE(CREATE_FN, SET_ITEM_FN) \
const auto& data = static_cast<const UnionArray&>(array); \
ScopedRef result(CREATE_FN(stop_idx - start_idx)); \
OwnedRef result(CREATE_FN(stop_idx - start_idx)); \
const uint8_t* type_ids = data.raw_type_ids(); \
const int32_t* value_offsets = data.raw_value_offsets(); \
for (int64_t i = start_idx; i < stop_idx; ++i) { \
if (data.IsNull(i)) { \
Py_INCREF(Py_None); \
SET_ITEM_FN(result.get(), i - start_idx, Py_None); \
SET_ITEM_FN(result.obj(), i - start_idx, Py_None); \
} else { \
int64_t offset = value_offsets[i]; \
uint8_t type = type_ids[i]; \
PyObject* value; \
RETURN_NOT_OK(GetValue(context, data, *data.UnsafeChild(type), offset, type, base, \
blobs, &value)); \
SET_ITEM_FN(result.get(), i - start_idx, value); \
SET_ITEM_FN(result.obj(), i - start_idx, value); \
} \
} \
*out = result.release(); \
*out = result.detach(); \
return Status::OK()

Status DeserializeList(PyObject* context, const Array& array, int64_t start_idx,
Expand All @@ -219,13 +219,13 @@ Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
PyObject** out) {
const auto& data = static_cast<const UnionArray&>(array);
ScopedRef result(PySet_New(nullptr));
OwnedRef result(PySet_New(nullptr));
const uint8_t* type_ids = data.raw_type_ids();
const int32_t* value_offsets = data.raw_value_offsets();
for (int64_t i = start_idx; i < stop_idx; ++i) {
if (data.IsNull(i)) {
Py_INCREF(Py_None);
if (PySet_Add(result.get(), Py_None) < 0) {
if (PySet_Add(result.obj(), Py_None) < 0) {
RETURN_IF_PYERROR();
}
} else {
Expand All @@ -234,12 +234,12 @@ Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
PyObject* value;
RETURN_NOT_OK(GetValue(context, data, *data.UnsafeChild(type), offset, type, base,
blobs, &value));
if (PySet_Add(result.get(), value) < 0) {
if (PySet_Add(result.obj(), value) < 0) {
RETURN_IF_PYERROR();
}
}
}
*out = result.release();
*out = result.detach();
return Status::OK();
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/python/builtin_convert.cc
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ static Status ConvertPySequenceReal(PyObject* obj, int64_t size,
PyAcquireGIL lock;

PyObject* seq;
ScopedRef tmp_seq_nanny;
OwnedRef tmp_seq_nanny;

std::shared_ptr<DataType> real_type;

Expand Down
52 changes: 17 additions & 35 deletions cpp/src/arrow/python/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,66 +61,48 @@ class ARROW_EXPORT PyAcquireGIL {

#define PYARROW_IS_PY2 PY_MAJOR_VERSION <= 2

// A RAII primitive that DECREFs the underlying PyObject* when it
// goes out of scope.
class ARROW_EXPORT OwnedRef {
public:
OwnedRef() : obj_(NULLPTR) {}

explicit OwnedRef(PyObject* obj) : obj_(obj) {}

~OwnedRef() {
PyAcquireGIL lock;
release();
}
~OwnedRef() { reset(); }

void reset(PyObject* obj) {
/// TODO(phillipc): Should we acquire the GIL here? It definitely needs to be
/// acquired,
/// but callers have probably already acquired it
Py_XDECREF(obj_);
obj_ = obj;
}

void release() {
Py_XDECREF(obj_);
obj_ = NULLPTR;
}

PyObject* obj() const { return obj_; }

private:
PyObject* obj_;
};

// This is different from OwnedRef in that it assumes that
// the GIL is held by the caller and doesn't decrement the
// reference count when release is called.
class ARROW_EXPORT ScopedRef {
public:
ScopedRef() : obj_(NULLPTR) {}
void reset() { reset(NULLPTR); }

explicit ScopedRef(PyObject* obj) : obj_(obj) {}

~ScopedRef() { Py_XDECREF(obj_); }

void reset(PyObject* obj) {
Py_XDECREF(obj_);
obj_ = obj;
}

PyObject* release() {
PyObject* detach() {
PyObject* result = obj_;
obj_ = NULLPTR;
return result;
}

PyObject* get() const { return obj_; }
PyObject* obj() const { return obj_; }

PyObject** ref() { return &obj_; }

private:
PyObject* obj_;
};

// Same as OwnedRef, but ensures the GIL is taken when it goes out of scope.
// This is for situations where the GIL is not always known to be held
// (e.g. if it is released in the middle of a function for performance reasons)
class ARROW_EXPORT OwnedRefNoGIL : public OwnedRef {
public:
~OwnedRefNoGIL() {
PyAcquireGIL lock;
reset();
}
};

struct ARROW_EXPORT PyObjectStringify {
OwnedRef tmp_obj;
const char* bytes;
Expand Down
20 changes: 10 additions & 10 deletions cpp/src/arrow/python/numpy_to_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1116,10 +1116,10 @@ Status LoopPySequence(PyObject* sequence, T func) {
}
}
} else if (PyObject_HasAttrString(sequence, "__iter__")) {
OwnedRef iter = OwnedRef(PyObject_GetIter(sequence));
OwnedRef iter(PyObject_GetIter(sequence));
PyObject* item;
while ((item = PyIter_Next(iter.obj()))) {
OwnedRef ref = OwnedRef(item);
OwnedRef ref(item);
RETURN_NOT_OK(func(ref.obj()));
}
} else {
Expand Down Expand Up @@ -1149,11 +1149,11 @@ Status LoopPySequenceWithMasks(PyObject* sequence,
}
}
} else if (PyObject_HasAttrString(sequence, "__iter__")) {
OwnedRef iter = OwnedRef(PyObject_GetIter(sequence));
OwnedRef iter(PyObject_GetIter(sequence));
PyObject* item;
int64_t i = 0;
while ((item = PyIter_Next(iter.obj()))) {
OwnedRef ref = OwnedRef(item);
OwnedRef ref(item);
RETURN_NOT_OK(func(ref.obj(), have_mask && mask_values[i]));
i++;
}
Expand Down Expand Up @@ -1476,20 +1476,20 @@ Status AppendUTF32(const char* data, int itemsize, int byteorder,
}
}

ScopedRef unicode_obj(PyUnicode_DecodeUTF32(data, actual_length * kNumPyUnicodeSize,
nullptr, &byteorder));
OwnedRef unicode_obj(PyUnicode_DecodeUTF32(data, actual_length * kNumPyUnicodeSize,
nullptr, &byteorder));
RETURN_IF_PYERROR();
ScopedRef utf8_obj(PyUnicode_AsUTF8String(unicode_obj.get()));
if (utf8_obj.get() == NULL) {
OwnedRef utf8_obj(PyUnicode_AsUTF8String(unicode_obj.obj()));
if (utf8_obj.obj() == NULL) {
PyErr_Clear();
return Status::Invalid("failed converting UTF32 to UTF8");
}

const int32_t length = static_cast<int32_t>(PyBytes_GET_SIZE(utf8_obj.get()));
const int32_t length = static_cast<int32_t>(PyBytes_GET_SIZE(utf8_obj.obj()));
if (builder->value_data_length() + length > kBinaryMemoryLimit) {
return Status::Invalid("Encoded string length exceeds maximum size (2GB)");
}
return builder->Append(PyBytes_AS_STRING(utf8_obj.get()), length);
return builder->Append(PyBytes_AS_STRING(utf8_obj.obj()), length);
}

} // namespace
Expand Down
42 changes: 21 additions & 21 deletions cpp/src/arrow/python/python_to_arrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,15 @@ Status CallCustomCallback(PyObject* context, PyObject* method_name, PyObject* el
*result = NULL;
if (context == Py_None) {
std::stringstream ss;
ScopedRef repr(PyObject_Repr(elem));
OwnedRef repr(PyObject_Repr(elem));
RETURN_IF_PYERROR();
#if PY_MAJOR_VERSION >= 3
ScopedRef ascii(PyUnicode_AsASCIIString(repr.get()));
OwnedRef ascii(PyUnicode_AsASCIIString(repr.obj()));
RETURN_IF_PYERROR();
ss << "error while calling callback on " << PyBytes_AsString(ascii.get())
ss << "error while calling callback on " << PyBytes_AsString(ascii.obj())
<< ": handler not registered";
#else
ss << "error while calling callback on " << PyString_AsString(repr.get())
ss << "error while calling callback on " << PyString_AsString(repr.obj())
<< ": handler not registered";
#endif
return Status::SerializationError(ss.str());
Expand All @@ -386,8 +386,8 @@ Status CallCustomCallback(PyObject* context, PyObject* method_name, PyObject* el

Status CallSerializeCallback(PyObject* context, PyObject* value,
PyObject** serialized_object) {
ScopedRef method_name(PyUnicode_FromString("_serialize_callback"));
RETURN_NOT_OK(CallCustomCallback(context, method_name.get(), value, serialized_object));
OwnedRef method_name(PyUnicode_FromString("_serialize_callback"));
RETURN_NOT_OK(CallCustomCallback(context, method_name.obj(), value, serialized_object));
if (!PyDict_Check(*serialized_object)) {
return Status::TypeError("serialization callback must return a valid dictionary");
}
Expand All @@ -396,8 +396,8 @@ Status CallSerializeCallback(PyObject* context, PyObject* value,

Status CallDeserializeCallback(PyObject* context, PyObject* value,
PyObject** deserialized_object) {
ScopedRef method_name(PyUnicode_FromString("_deserialize_callback"));
return CallCustomCallback(context, method_name.get(), value, deserialized_object);
OwnedRef method_name(PyUnicode_FromString("_deserialize_callback"));
return CallCustomCallback(context, method_name.obj(), value, deserialized_object);
}

Status SerializeDict(PyObject* context, std::vector<PyObject*> dicts,
Expand Down Expand Up @@ -493,9 +493,9 @@ Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
#if PY_MAJOR_VERSION >= 3
char* data = PyUnicode_AsUTF8AndSize(elem, &size);
#else
ScopedRef str(PyUnicode_AsUTF8String(elem));
char* data = PyString_AS_STRING(str.get());
size = PyString_GET_SIZE(str.get());
OwnedRef str(PyUnicode_AsUTF8String(elem));
char* data = PyString_AS_STRING(str.obj());
size = PyString_GET_SIZE(str.obj());
#endif
if (size > std::numeric_limits<int32_t>::max()) {
return Status::Invalid("Cannot writes bytes over 2GB");
Expand Down Expand Up @@ -585,15 +585,15 @@ Status SerializeSequences(PyObject* context, std::vector<PyObject*> sequences,
SequenceBuilder builder(nullptr);
std::vector<PyObject*> sublists, subtuples, subdicts, subsets;
for (const auto& sequence : sequences) {
ScopedRef iterator(PyObject_GetIter(sequence));
OwnedRef iterator(PyObject_GetIter(sequence));
RETURN_IF_PYERROR();
ScopedRef item;
OwnedRef item;
while (true) {
item.reset(PyIter_Next(iterator.get()));
if (!item.get()) {
item.reset(PyIter_Next(iterator.obj()));
if (!item.obj()) {
break;
}
RETURN_NOT_OK(Append(context, item.get(), &builder, &sublists, &subtuples,
RETURN_NOT_OK(Append(context, item.obj(), &builder, &sublists, &subtuples,
&subdicts, &subsets, blobs_out));
}
}
Expand Down Expand Up @@ -739,18 +739,18 @@ Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out) {
PyAcquireGIL py_gil;

ScopedRef result(PyDict_New());
OwnedRef result(PyDict_New());
PyObject* buffers = PyList_New(0);

// TODO(wesm): Not sure how pedantic we need to be about checking the return
// values of these functions. There are other places where we do not check
// PyDict_SetItem/SetItemString return value, but these failures would be
// quite esoteric
PyDict_SetItemString(result.get(), "num_tensors",
PyDict_SetItemString(result.obj(), "num_tensors",
PyLong_FromSize_t(this->tensors.size()));
PyDict_SetItemString(result.get(), "num_buffers",
PyDict_SetItemString(result.obj(), "num_buffers",
PyLong_FromSize_t(this->buffers.size()));
PyDict_SetItemString(result.get(), "data", buffers);
PyDict_SetItemString(result.obj(), "data", buffers);
RETURN_IF_PYERROR();

Py_DECREF(buffers);
Expand Down Expand Up @@ -792,7 +792,7 @@ Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out
RETURN_NOT_OK(PushBuffer(buf));
}

*out = result.release();
*out = result.detach();
return Status::OK();
}

Expand Down
Loading