Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions distributed/comm/ucx.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,10 @@
ucx_create_listener = None


def synchronize_stream(stream=0):
def synchronize_ptds():
import numba.cuda

ctx = numba.cuda.current_context()
cu_stream = numba.cuda.driver.drvapi.cu_stream(stream)
stream = numba.cuda.driver.Stream(ctx, cu_stream, None)
stream = numba.cuda.per_thread_default_stream()
stream.synchronize()


Expand Down Expand Up @@ -221,13 +219,13 @@ async def write(

# Send frames

# It is necessary to first synchronize the default stream before start sending
# We synchronize the default stream because UCX is not stream-ordered and
# syncing the default stream will wait for other non-blocking CUDA streams.
# It is necessary to first synchronize the per-thread default stream before start sending
# We synchronize the per-thread default stream because UCX is not stream-ordered and
# syncing the per-thread default stream will wait for other non-blocking CUDA streams.
Comment on lines +223 to +224
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't wait for other threads' default per-thread default streams...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So should we leave this as-is? Or is there something else we should be doing here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Mark is referring to the section of syncing the per-thread default stream will wait for other non-blocking CUDA streams as that line is incorrect. Syncing the per-thread default stream will only wait for the current stream WITHOUT blocking work on other non-blocking CUDA streams.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I just wasn't sure if this implied changes beyond the text in the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming syncing only the PTDS is correct behaviour, I think only the comment is incorrect.

# Note this is only sufficient if the memory being sent is not currently in use on
# non-blocking CUDA streams.
if any(cuda_send_frames):
synchronize_stream(0)
synchronize_ptds()

for each_frame in send_frames:
await self.ep.send(each_frame)
Expand Down Expand Up @@ -280,9 +278,9 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
)

# It is necessary to first populate `frames` with CUDA arrays and synchronize
# the default stream before starting receiving to ensure buffers have been allocated
# the per-thread default stream before starting receiving to ensure buffers have been allocated
if any(cuda_recv_frames):
synchronize_stream(0)
synchronize_ptds()

for each_frame in recv_frames:
await self.ep.recv(each_frame)
Expand Down