Skip to content

[SYCL][CUDA][HIP][PI] Fix barrier #6490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 89 additions & 28 deletions sycl/plugins/cuda/pi_cuda.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,25 @@ pi_result cuda_piEventRetain(pi_event event);

/// \endcond

void _pi_queue::compute_stream_wait_for_barrier_if_needed(CUstream stream,
pi_uint32 stream_i) {
if (barrier_event_ && !compute_applied_barrier_[stream_i]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to check if the barrier event has finished prior to this so we can potentially clear it and skip the cuStreamWaitEvent call?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are proposing an additional CUDA API call for potentially every operation that might let us eliminate additional calls in the future. Whether this makes sense to do or not depends on whether we expect most streams will have work enqueued to them before or after all the work before the barrier is finished. If the answer is before, the current code will be more performant. If the answer is after, we want to do what you suggest.

My gut feeling says current implementation will be better for most use cases. However the reasoning from the previous paragraph assumes that cuStreamWaitEvent and cuEventQuery take roughly the same time. Looking into that could give us more information that could inform this decision. However, for now I would leave this as it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, this is potentially more work up-front, though I would expect cuEventQuery to be somewhat lightweight, but doing some benchmarking for it may make sense before a final decision is made on this. I am okay to keep as-is. 😄

PI_CHECK_ERROR(cuStreamWaitEvent(stream, barrier_event_, 0));
compute_applied_barrier_[stream_i] = true;
}
}

void _pi_queue::transfer_stream_wait_for_barrier_if_needed(CUstream stream,
pi_uint32 stream_i) {
if (barrier_event_ && !transfer_applied_barrier_[stream_i]) {
PI_CHECK_ERROR(cuStreamWaitEvent(stream, barrier_event_, 0));
transfer_applied_barrier_[stream_i] = true;
}
}

CUstream _pi_queue::get_next_compute_stream(pi_uint32 *stream_token) {
pi_uint32 stream_i;
pi_uint32 token;
while (true) {
if (num_compute_streams_ < compute_streams_.size()) {
// the check above is for performance - so as not to lock mutex every time
Expand All @@ -394,40 +411,46 @@ CUstream _pi_queue::get_next_compute_stream(pi_uint32 *stream_token) {
cuStreamCreate(&compute_streams_[num_compute_streams_++], flags_));
}
}
stream_i = compute_stream_idx_++;
token = compute_stream_idx_++;
stream_i = token % compute_streams_.size();
// if a stream has been reused before it was next selected round-robin
// fashion, we want to delay its next use and instead select another one
// that is more likely to have completed all the enqueued work.
if (delay_compute_[stream_i % compute_streams_.size()]) {
delay_compute_[stream_i % compute_streams_.size()] = false;
if (delay_compute_[stream_i]) {
delay_compute_[stream_i] = false;
} else {
break;
}
}
if (stream_token) {
*stream_token = stream_i;
*stream_token = token;
}
return compute_streams_[stream_i % compute_streams_.size()];
CUstream res = compute_streams_[stream_i];
compute_stream_wait_for_barrier_if_needed(res, stream_i);
return res;
}

CUstream _pi_queue::get_next_compute_stream(pi_uint32 num_events_in_wait_list,
const pi_event *event_wait_list,
_pi_stream_guard &guard,
pi_uint32 *stream_token) {
for (pi_uint32 i = 0; i < num_events_in_wait_list; i++) {
pi_uint32 token = event_wait_list[i]->get_stream_token();
pi_uint32 token = event_wait_list[i]->get_compute_stream_token();
if (event_wait_list[i]->get_queue() == this && can_reuse_stream(token)) {
std::unique_lock<std::mutex> compute_sync_guard(
compute_stream_sync_mutex_);
// redo the check after lock to avoid data races on
// last_sync_compute_streams_
if (can_reuse_stream(token)) {
delay_compute_[token % delay_compute_.size()] = true;
pi_uint32 stream_i = token % delay_compute_.size();
delay_compute_[stream_i] = true;
if (stream_token) {
*stream_token = token;
}
guard = _pi_stream_guard{std::move(compute_sync_guard)};
return event_wait_list[i]->get_stream();
CUstream res = event_wait_list[i]->get_stream();
compute_stream_wait_for_barrier_if_needed(res, stream_i);
return res;
}
}
}
Expand All @@ -449,7 +472,10 @@ CUstream _pi_queue::get_next_transfer_stream() {
cuStreamCreate(&transfer_streams_[num_transfer_streams_++], flags_));
}
}
return transfer_streams_[transfer_stream_idx_++ % transfer_streams_.size()];
pi_uint32 stream_i = transfer_stream_idx_++ % transfer_streams_.size();
CUstream res = transfer_streams_[stream_i];
transfer_stream_wait_for_barrier_if_needed(res, stream_i);
return res;
}

_pi_event::_pi_event(pi_command_type type, pi_context context, pi_queue queue,
Expand Down Expand Up @@ -2535,7 +2561,7 @@ pi_result cuda_piQueueFinish(pi_queue command_queue) {
nullptr); // need PI_ERROR_INVALID_EXTERNAL_HANDLE error code
ScopedContext active(command_queue->get_context());

command_queue->sync_streams([&result](CUstream s) {
command_queue->sync_streams</*ResetUsed=*/true>([&result](CUstream s) {
result = PI_CHECK_ERROR(cuStreamSynchronize(s));
});

Expand Down Expand Up @@ -3860,35 +3886,70 @@ pi_result cuda_piEnqueueEventsWaitWithBarrier(pi_queue command_queue,
pi_uint32 num_events_in_wait_list,
const pi_event *event_wait_list,
pi_event *event) {
// This function makes one stream work on the previous work (or work
// represented by input events) and then all future work waits on that stream.
if (!command_queue) {
return PI_ERROR_INVALID_QUEUE;
}

pi_result result;

try {
ScopedContext active(command_queue->get_context());
pi_uint32 stream_token;
_pi_stream_guard guard;
CUstream cuStream = command_queue->get_next_compute_stream(
num_events_in_wait_list, event_wait_list, guard, &stream_token);
{
std::lock_guard(command_queue->barrier_mutex_);
if (command_queue->barrier_event_ == nullptr) {
PI_CHECK_ERROR(cuEventCreate(&command_queue->barrier_event_,
CU_EVENT_DISABLE_TIMING));
}
if (num_events_in_wait_list == 0) { // wait on all work
if (command_queue->barrier_tmp_event_ == nullptr) {
PI_CHECK_ERROR(cuEventCreate(&command_queue->barrier_tmp_event_,
CU_EVENT_DISABLE_TIMING));
}
command_queue->sync_streams(
[cuStream,
tmp_event = command_queue->barrier_tmp_event_](CUstream s) {
if (cuStream != s) {
// record a new CUDA event on every stream and make one stream
// wait for these events
PI_CHECK_ERROR(cuEventRecord(tmp_event, s));
PI_CHECK_ERROR(cuStreamWaitEvent(cuStream, tmp_event, 0));
}
});
} else { // wait just on given events
forLatestEvents(event_wait_list, num_events_in_wait_list,
[cuStream](pi_event event) -> pi_result {
if (event->get_queue()->has_been_synchronized(
event->get_compute_stream_token())) {
return PI_SUCCESS;
} else {
return PI_CHECK_ERROR(
cuStreamWaitEvent(cuStream, event->get(), 0));
}
});
}

if (event_wait_list) {
auto result =
forLatestEvents(event_wait_list, num_events_in_wait_list,
[command_queue](pi_event event) -> pi_result {
if (event->get_queue()->has_been_synchronized(
event->get_stream_token())) {
return PI_SUCCESS;
} else {
return enqueueEventWait(command_queue, event);
}
});

if (result != PI_SUCCESS) {
return result;
result = PI_CHECK_ERROR(
cuEventRecord(command_queue->barrier_event_, cuStream));
for (unsigned int i = 0;
i < command_queue->compute_applied_barrier_.size(); i++) {
command_queue->compute_applied_barrier_[i] = false;
}
for (unsigned int i = 0;
i < command_queue->transfer_applied_barrier_.size(); i++) {
command_queue->transfer_applied_barrier_[i] = false;
}
}
if (result != PI_SUCCESS) {
return result;
}

if (event) {
pi_uint32 stream_token;
_pi_stream_guard guard;
CUstream cuStream = command_queue->get_next_compute_stream(
num_events_in_wait_list, event_wait_list, guard, &stream_token);
*event = _pi_event::make_native(PI_COMMAND_TYPE_MARKER, command_queue,
cuStream, stream_token);
(*event)->start();
Expand Down
27 changes: 22 additions & 5 deletions sycl/plugins/cuda/pi_cuda.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,14 @@ struct _pi_queue {
// will be skipped the next time it would be selected round-robin style. When
// skipped, its delay flag is cleared.
std::vector<bool> delay_compute_;
// keep track of which streams have applied barrier
std::vector<bool> compute_applied_barrier_;
std::vector<bool> transfer_applied_barrier_;
_pi_context *context_;
_pi_device *device_;
pi_queue_properties properties_;
CUevent barrier_event_ = nullptr;
CUevent barrier_tmp_event_ = nullptr;
std::atomic_uint32_t refCount_;
std::atomic_uint32_t eventCount_;
std::atomic_uint32_t compute_stream_idx_;
Expand All @@ -426,6 +431,7 @@ struct _pi_queue {
std::mutex compute_stream_sync_mutex_;
std::mutex compute_stream_mutex_;
std::mutex transfer_stream_mutex_;
std::mutex barrier_mutex_;
bool has_ownership_;

_pi_queue(std::vector<CUstream> &&compute_streams,
Expand All @@ -434,7 +440,9 @@ struct _pi_queue {
unsigned int flags, bool backend_owns = true)
: compute_streams_{std::move(compute_streams)},
transfer_streams_{std::move(transfer_streams)},
delay_compute_(compute_streams_.size(), false), context_{context},
delay_compute_(compute_streams_.size(), false),
compute_applied_barrier_(compute_streams_.size()),
transfer_applied_barrier_(transfer_streams_.size()), context_{context},
device_{device}, properties_{properties}, refCount_{1}, eventCount_{0},
compute_stream_idx_{0}, transfer_stream_idx_{0},
num_compute_streams_{0}, num_transfer_streams_{0},
Expand All @@ -449,6 +457,11 @@ struct _pi_queue {
cuda_piDeviceRelease(device_);
}

void compute_stream_wait_for_barrier_if_needed(CUstream stream,
pi_uint32 stream_i);
void transfer_stream_wait_for_barrier_if_needed(CUstream stream,
pi_uint32 stream_i);

// get_next_compute/transfer_stream() functions return streams from
// appropriate pools in round-robin fashion
native_type get_next_compute_stream(pi_uint32 *stream_token = nullptr);
Expand Down Expand Up @@ -513,7 +526,7 @@ struct _pi_queue {
}
}

template <typename T> void sync_streams(T &&f) {
template <bool ResetUsed = false, typename T> void sync_streams(T &&f) {
auto sync_compute = [&f, &streams = compute_streams_,
&delay = delay_compute_](unsigned int start,
unsigned int stop) {
Expand All @@ -536,7 +549,9 @@ struct _pi_queue {
unsigned int end = num_compute_streams_ < size
? num_compute_streams_
: compute_stream_idx_.load();
last_sync_compute_streams_ = end;
if (ResetUsed) {
last_sync_compute_streams_ = end;
}
if (end - start >= size) {
sync_compute(0, size);
} else {
Expand All @@ -558,7 +573,9 @@ struct _pi_queue {
unsigned int end = num_transfer_streams_ < size
? num_transfer_streams_
: transfer_stream_idx_.load();
last_sync_transfer_streams_ = end;
if (ResetUsed) {
last_sync_transfer_streams_ = end;
}
if (end - start >= size) {
sync_transfer(0, size);
} else {
Expand Down Expand Up @@ -610,7 +627,7 @@ struct _pi_event {

CUstream get_stream() const noexcept { return stream_; }

pi_uint32 get_stream_token() const noexcept { return streamToken_; }
pi_uint32 get_compute_stream_token() const noexcept { return streamToken_; }

pi_command_type get_command_type() const noexcept { return commandType_; }

Expand Down
Loading