Skip to content

Commit

Permalink
Fix passing empty bytes in python tasks (#7045)
Browse files Browse the repository at this point in the history
* ensure data_ won't be null_ptr when size == 0

* when data_sizes[i] == 0, we should Allocate an empty buffer

* work around for pyarrow.py_buffer

* fix comments

* add null ptr check

* add test for bytes

* lint
  • Loading branch information
chaokunyang authored Feb 10, 2020
1 parent 694c0f2 commit 247a4d0
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 6 deletions.
11 changes: 7 additions & 4 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -566,10 +566,13 @@ cdef write_serialized_object(
from ray.serialization import Pickle5SerializedObject, RawSerializedObject

if isinstance(serialized_object, RawSerializedObject):
buffer = Buffer.make(buf)
stream = pyarrow.FixedSizeBufferWriter(pyarrow.py_buffer(buffer))
stream.set_memcopy_threads(MEMCOPY_THREADS)
stream.write(pyarrow.py_buffer(serialized_object.value))
if buf.get() != NULL and buf.get().Size() > 0:
buffer = Buffer.make(buf)
# `Buffer` has a nullptr buffer underlying if size is 0,
# which will cause `pyarrow.py_buffer` crash
stream = pyarrow.FixedSizeBufferWriter(pyarrow.py_buffer(buffer))
stream.set_memcopy_threads(MEMCOPY_THREADS)
stream.write(pyarrow.py_buffer(serialized_object.value))
elif isinstance(serialized_object, Pickle5SerializedObject):
(<Pickle5Writer>serialized_object.writer).write_to(
serialized_object.inband, buf, MEMCOPY_THREADS)
Expand Down
2 changes: 2 additions & 0 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def test_simple_serialization(ray_start_regular):
0.9,
1 << 62,
1 << 999,
b"",
b"a",
"a",
string.printable,
"\u262F",
Expand Down
3 changes: 2 additions & 1 deletion src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,8 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
if (!return_objects->at(i)) {
continue;
}
if (return_objects->at(i)->GetData()->IsPlasmaBuffer()) {
if (return_objects->at(i)->GetData() != nullptr &&
return_objects->at(i)->GetData()->IsPlasmaBuffer()) {
if (!Seal(return_ids[i], /*pin_object=*/false).ok()) {
RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to seal object "
<< return_ids[i] << " in store: " << status.message();
Expand Down
2 changes: 1 addition & 1 deletion src/ray/core_worker/transport/direct_actor_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(

// The object is nullptr if it already existed in the object store.
const auto &result = return_objects[i];
if (result == nullptr || result->GetData()->IsPlasmaBuffer()) {
if (result->GetData() != nullptr && result->GetData()->IsPlasmaBuffer()) {
return_object->set_in_plasma(true);
plasma_return_ids.push_back(id);
} else {
Expand Down

0 comments on commit 247a4d0

Please sign in to comment.