Skip to content
17 changes: 9 additions & 8 deletions cpp/src/arrow/python/deserialize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu

ipc::Message message(metadata, body);

RETURN_NOT_OK(ReadTensor(message, &tensor));
RETURN_NOT_OK(ipc::ReadTensor(message, &tensor));
out->tensors.emplace_back(std::move(tensor));
}

Expand All @@ -375,7 +375,7 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu

ipc::Message message(metadata, body);

RETURN_NOT_OK(ReadTensor(message, &tensor));
RETURN_NOT_OK(ipc::ReadTensor(message, &tensor));
out->ndarrays.emplace_back(std::move(tensor));
}

Expand All @@ -389,19 +389,20 @@ Status GetSerializedFromComponents(int num_tensors, int num_ndarrays, int num_bu
return Status::OK();
}

Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out) {
if (object.tensors.size() != 1) {
return Status::Invalid("Object is not a Tensor");
Status DeserializeNdarray(const SerializedPyObject& object,
std::shared_ptr<Tensor>* out) {
if (object.ndarrays.size() != 1) {
return Status::Invalid("Object is not an Ndarray");
}
*out = object.tensors[0];
*out = object.ndarrays[0];
return Status::OK();
}

Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
Status NdarrayFromBuffer(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
io::BufferReader reader(src);
SerializedPyObject object;
RETURN_NOT_OK(ReadSerializedObject(&reader, &object));
return DeserializeTensor(object, out);
return DeserializeNdarray(object, out);
}

} // namespace py
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/python/deserialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ ARROW_EXPORT
Status DeserializeObject(PyObject* context, const SerializedPyObject& object,
PyObject* base, PyObject** out);

/// \brief Reconstruct Tensor from Arrow-serialized representation
/// \brief Reconstruct Ndarray from Arrow-serialized representation
/// \param[in] object Object to deserialize
/// \param[out] out The deserialized tensor
/// \return Status
ARROW_EXPORT
Status DeserializeTensor(const SerializedPyObject& object, std::shared_ptr<Tensor>* out);
Status DeserializeNdarray(const SerializedPyObject& object, std::shared_ptr<Tensor>* out);

ARROW_EXPORT
Status ReadTensor(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out);
Status NdarrayFromBuffer(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out);

} // namespace py
} // namespace arrow
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/arrow/python/serialize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -752,23 +752,23 @@ Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject
return Status::OK();
}

Status SerializeTensor(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
Status SerializeNdarray(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
std::shared_ptr<Array> array;
SequenceBuilder builder;
RETURN_NOT_OK(builder.AppendTensor(static_cast<int32_t>(out->tensors.size())));
out->tensors.push_back(tensor);
RETURN_NOT_OK(builder.AppendNdarray(static_cast<int32_t>(out->ndarrays.size())));
out->ndarrays.push_back(tensor);
RETURN_NOT_OK(builder.Finish(nullptr, nullptr, nullptr, nullptr, &array));
out->batch = MakeBatch(array);
return Status::OK();
}

Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
io::OutputStream* dst) {
Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
io::OutputStream* dst) {
auto empty_tensor = std::make_shared<Tensor>(
dtype, std::make_shared<Buffer>(nullptr, tensor_num_bytes), shape);
SerializedPyObject serialized_tensor;
RETURN_NOT_OK(SerializeTensor(empty_tensor, &serialized_tensor));
RETURN_NOT_OK(SerializeNdarray(empty_tensor, &serialized_tensor));
return serialized_tensor.WriteTo(dst);
}

Expand Down
6 changes: 3 additions & 3 deletions cpp/src/arrow/python/serialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ Status SerializeTensor(std::shared_ptr<Tensor> tensor, py::SerializedPyObject* o
/// \param[in] dst The OutputStream to write the Tensor header to
/// \return Status
ARROW_EXPORT
Status WriteTensorHeader(std::shared_ptr<DataType> dtype,
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
io::OutputStream* dst);
Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
io::OutputStream* dst);

} // namespace py

Expand Down
48 changes: 32 additions & 16 deletions python/pyarrow/tensorflow/plasma_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
if (!connected_) {
VLOG(1) << "Connecting to Plasma...";
ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
plasma_manager_socket_name_,
plasma::kPlasmaDefaultReleaseDelay));
plasma_manager_socket_name_, 0));
VLOG(1) << "Connected!";
connected_ = true;
}
Expand Down Expand Up @@ -141,7 +140,7 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {
std::vector<int64_t> shape = {total_bytes / byte_width};

arrow::io::MockOutputStream mock;
ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, 0, &mock));
ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, 0, &mock));
int64_t header_size = mock.GetExtentBytesWritten();

std::shared_ptr<Buffer> data_buffer;
Expand All @@ -153,15 +152,21 @@ class TensorToPlasmaOp : public tf::AsyncOpKernel {

int64_t offset;
arrow::io::FixedSizeBufferWriter buf(data_buffer);
ARROW_CHECK_OK(arrow::py::WriteTensorHeader(arrow_dtype, shape, total_bytes, &buf));
ARROW_CHECK_OK(arrow::py::WriteNdarrayHeader(arrow_dtype, shape, total_bytes, &buf));
ARROW_CHECK_OK(buf.Tell(&offset));

uint8_t* data = reinterpret_cast<uint8_t*>(data_buffer->mutable_data() + offset);

auto wrapped_callback = [this, context, done, data_buffer, object_id]() {
auto wrapped_callback = [this, context, done, data_buffer, data, object_id]() {
{
tf::mutex_lock lock(mu_);
ARROW_CHECK_OK(client_.Seal(object_id));
ARROW_CHECK_OK(client_.Release(object_id));
#ifdef GOOGLE_CUDA
auto orig_stream = context->op_device_context()->stream();
auto stream_executor = orig_stream->parent();
CHECK(stream_executor->HostMemoryUnregister(static_cast<void*>(data)));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably only want this to happen in the GPU case, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with the other op.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I updated it (it actually doesn't compile for cpu only tensorflow).

#endif
}
context->SetStatus(tensorflow::Status::OK());
done();
Expand Down Expand Up @@ -244,8 +249,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
if (!connected_) {
VLOG(1) << "Connecting to Plasma...";
ARROW_CHECK_OK(client_.Connect(plasma_store_socket_name_,
plasma_manager_socket_name_,
plasma::kPlasmaDefaultReleaseDelay));
plasma_manager_socket_name_, 0));
VLOG(1) << "Connected!";
connected_ = true;
}
Expand Down Expand Up @@ -284,25 +288,39 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
/*timeout_ms=*/-1, &object_buffer));
}

std::shared_ptr<arrow::Tensor> tensor;
ARROW_CHECK_OK(arrow::py::ReadTensor(object_buffer.data, &tensor));
std::shared_ptr<arrow::Tensor> ndarray;
ARROW_CHECK_OK(arrow::py::NdarrayFromBuffer(object_buffer.data, &ndarray));

int64_t byte_width = get_byte_width(*tensor->type());
const int64_t size_in_bytes = tensor->data()->size();
int64_t byte_width = get_byte_width(*ndarray->type());
const int64_t size_in_bytes = ndarray->data()->size();

tf::TensorShape shape({static_cast<int64_t>(size_in_bytes / byte_width)});

const float* plasma_data = reinterpret_cast<const float*>(tensor->raw_data());
const float* plasma_data = reinterpret_cast<const float*>(ndarray->raw_data());

tf::Tensor* output_tensor = nullptr;
OP_REQUIRES_OK_ASYNC(context, context->allocate_output(0, shape, &output_tensor),
done);

auto wrapped_callback = [this, context, done, plasma_data, object_id]() {
{
tf::mutex_lock lock(mu_);
ARROW_CHECK_OK(client_.Release(object_id));
#ifdef GOOGLE_CUDA
auto orig_stream = context->op_device_context()->stream();
auto stream_executor = orig_stream->parent();
CHECK(stream_executor->HostMemoryUnregister(
const_cast<void*>(static_cast<const void*>(plasma_data))));
#endif
}
done();
};

if (std::is_same<Device, CPUDevice>::value) {
std::memcpy(
reinterpret_cast<void*>(const_cast<char*>(output_tensor->tensor_data().data())),
plasma_data, size_in_bytes);
done();
wrapped_callback();
} else {
#ifdef GOOGLE_CUDA
auto orig_stream = context->op_device_context()->stream();
Expand All @@ -319,8 +337,6 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
}

// Important. See note in T2P op.
// We don't check the return status since the host memory might've been
// already registered (e.g., the TensorToPlasmaOp might've been run).
CHECK(stream_executor->HostMemoryRegister(
const_cast<void*>(static_cast<const void*>(plasma_data)),
static_cast<tf::uint64>(size_in_bytes)));
Expand All @@ -341,7 +357,7 @@ class PlasmaToTensorOp : public tf::AsyncOpKernel {
CHECK(orig_stream->ThenWaitFor(h2d_stream).ok());

context->device()->tensorflow_gpu_device_info()->event_mgr->ThenExecute(
h2d_stream, std::move(done));
h2d_stream, std::move(wrapped_callback));
#endif
}
}
Expand Down
5 changes: 4 additions & 1 deletion python/pyarrow/tests/test_plasma_tf_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def FromPlasma():
# Try getting the data from Python
plasma_object_id = plasma.ObjectID(object_id)
obj = client.get(plasma_object_id)
obj = obj.to_numpy()

# Deserialized Tensor should be 64-byte aligned.
assert obj.ctypes.data % 64 == 0
Expand Down Expand Up @@ -100,3 +99,7 @@ def test_plasma_tf_op(use_gpu=False):
np.int8, np.int16, np.int32, np.int64]:
run_tensorflow_test_with_dtype(tf, plasma, plasma_store_name,
client, use_gpu, dtype)

# Make sure the objects have been released.
for _, info in client.list().items():
assert info['ref_count'] == 0