Skip to content

Commit

Permalink
[Core] release GIL when running parallel_memcopy() / memcpy() dur…
Browse files Browse the repository at this point in the history
…ing serializations (ray-project#22492)

While investigating ray-project#22161, it is observed GIL is held for an extended amount of time (up to 1000s) with stack trace [1]. It is possible either there are many iterations within `Pickle5Writer.write_to()` calling `ray::parallel_memcopy()`, or a few `ray::parallel_memcopy()` taking a long time (less likely). Either way, `ray::parallel_memcopy()` or `std::memcpy()` should not hold GIL.
  • Loading branch information
mwtian authored Feb 18, 2022
1 parent 9525618 commit 5a4c6d2
Showing 1 changed file with 24 additions and 20 deletions.
44 changes: 24 additions & 20 deletions python/ray/includes/serialization.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ cdef class Pickle5Writer:
(<int64_t*>ptr)[1] = protobuf_size
# Write inband data.
ptr += sizeof(int64_t) * 2
memcpy(ptr, &inband[0], len(inband))
with nogil:
memcpy(ptr, &inband[0], len(inband))
# Write protobuf data.
ptr += len(inband)
self.python_object.SerializeWithCachedSizesToArray(ptr)
Expand All @@ -375,14 +376,15 @@ cdef class Pickle5Writer:
for i in range(self.python_object.buffer_size()):
buffer_addr = self.python_object.buffer(i).address()
buffer_len = self.python_object.buffer(i).length()
if (memcopy_threads > 1 and
buffer_len > kMemcopyDefaultThreshold):
parallel_memcopy(ptr + buffer_addr,
<const uint8_t*> self.buffers[i].buf,
buffer_len,
kMemcopyDefaultBlocksize, memcopy_threads)
else:
memcpy(ptr + buffer_addr, self.buffers[i].buf, buffer_len)
with nogil:
if (memcopy_threads > 1 and
buffer_len > kMemcopyDefaultThreshold):
parallel_memcopy(ptr + buffer_addr,
<const uint8_t*> self.buffers[i].buf,
buffer_len,
kMemcopyDefaultBlocksize, memcopy_threads)
else:
memcpy(ptr + buffer_addr, self.buffers[i].buf, buffer_len)


cdef class SerializedObject(object):
Expand Down Expand Up @@ -487,9 +489,10 @@ cdef class MessagePackSerializedObject(SerializedObject):
cdef uint8_t *ptr = &buffer[0]

# Write msgpack data first.
memcpy(ptr, self.msgpack_header_ptr, self._msgpack_header_bytes)
memcpy(ptr + kMessagePackOffset,
self.msgpack_data_ptr, self._msgpack_data_bytes)
with nogil:
memcpy(ptr, self.msgpack_header_ptr, self._msgpack_header_bytes)
memcpy(ptr + kMessagePackOffset,
self.msgpack_data_ptr, self._msgpack_data_bytes)

if self.nest_serialized_object is not None:
self.nest_serialized_object.write_to(
Expand All @@ -516,11 +519,12 @@ cdef class RawSerializedObject(SerializedObject):
@cython.boundscheck(False)
@cython.wraparound(False)
cdef void write_to(self, uint8_t[:] buffer) nogil:
if (MEMCOPY_THREADS > 1 and
self._total_bytes > kMemcopyDefaultThreshold):
parallel_memcopy(&buffer[0],
self.value_ptr,
self._total_bytes, kMemcopyDefaultBlocksize,
MEMCOPY_THREADS)
else:
memcpy(&buffer[0], self.value_ptr, self._total_bytes)
with nogil:
if (MEMCOPY_THREADS > 1 and
self._total_bytes > kMemcopyDefaultThreshold):
parallel_memcopy(&buffer[0],
self.value_ptr,
self._total_bytes, kMemcopyDefaultBlocksize,
MEMCOPY_THREADS)
else:
memcpy(&buffer[0], self.value_ptr, self._total_bytes)

0 comments on commit 5a4c6d2

Please sign in to comment.