Skip to content

[SYCL] Fix memory dependency leaks caused by failed kernel enqueue #5120

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 2, 2022
Merged
72 changes: 66 additions & 6 deletions sycl/source/detail/scheduler/graph_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,7 @@ MemObjRecord *Scheduler::GraphBuilder::getOrInsertMemObjRecord(

void Scheduler::GraphBuilder::updateLeaves(const std::set<Command *> &Cmds,
MemObjRecord *Record,
access::mode AccessMode,
std::vector<Command *> &ToCleanUp) {

const bool ReadOnlyReq = AccessMode == access::mode::read;
if (ReadOnlyReq)
return;

for (Command *Cmd : Cmds) {
bool WasLeaf = Cmd->MLeafCounter > 0;
Cmd->MLeafCounter -= Record->MReadLeaves.remove(Cmd);
Expand All @@ -252,6 +246,18 @@ void Scheduler::GraphBuilder::updateLeaves(const std::set<Command *> &Cmds,
}
}

void Scheduler::GraphBuilder::updateLeaves(const std::set<Command *> &Cmds,
MemObjRecord *Record,
access::mode AccessMode,
std::vector<Command *> &ToCleanUp) {

const bool ReadOnlyReq = AccessMode == access::mode::read;
if (ReadOnlyReq)
return;

updateLeaves(Cmds, Record, ToCleanUp);
}

void Scheduler::GraphBuilder::addNodeToLeaves(
MemObjRecord *Record, Command *Cmd, access::mode AccessMode,
std::vector<Command *> &ToEnqueue) {
Expand Down Expand Up @@ -1253,6 +1259,60 @@ void Scheduler::GraphBuilder::cleanupFinishedCommands(
handleVisitedNodes(MVisitedCmds);
}

void Scheduler::GraphBuilder::cleanupFailedCommand(
Command *FailedCmd,
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>>
&StreamsToDeallocate,
std::vector<Command *> &ToCleanUp) {

// If the failed command has no users and no dependencies, there is no reason
// to replace it with an empty command.
if (FailedCmd->MDeps.size() == 0 && FailedCmd->MUsers.size() == 0)
return;

// Create empty command that is "ready" for enqueuing.
EmptyCommand *EmptyCmd = new EmptyCommand(FailedCmd->getQueue());
if (!EmptyCmd)
throw runtime_error("Out of host memory", PI_OUT_OF_HOST_MEMORY);
EmptyCmd->MEnqueueStatus = EnqueueResultT::SyclEnqueueReady;

// Collect stream objects for the failed command.
if (FailedCmd->getType() == Command::CommandType::RUN_CG) {
auto ExecCmd = static_cast<ExecCGCommand *>(FailedCmd);
std::vector<std::shared_ptr<stream_impl>> Streams = ExecCmd->getStreams();
ExecCmd->clearStreams();
StreamsToDeallocate.insert(StreamsToDeallocate.end(), Streams.begin(),
Streams.end());
Comment on lines +1280 to +1285
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do similar handling for reduction resources here once #5653 lands?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, absolutely! I will make sure it is included into whichever is merged last.

}

for (DepDesc &Dep : FailedCmd->MDeps) {
// Replace failed command in dependency records.
const Requirement *Req = Dep.MDepRequirement;
MemObjRecord *Record = getMemObjRecord(Req->MSYCLMemObj);
updateLeaves({FailedCmd}, Record, ToCleanUp);
std::vector<Command *> ToEnqueue;
addNodeToLeaves(Record, EmptyCmd, Req->MAccessMode, ToEnqueue);
assert(ToEnqueue.empty());

// Replace failed command as a user.
if (Dep.MDepCommand->MUsers.erase(FailedCmd)) {
Dep.MDepCommand->MUsers.insert(EmptyCmd);
EmptyCmd->MDeps.push_back(Dep);
}
}
FailedCmd->MDeps.clear();

for (Command *UserCmd : FailedCmd->MUsers)
for (DepDesc &Dep : UserCmd->MDeps)
if (Dep.MDepCommand == FailedCmd)
Dep.MDepCommand = EmptyCmd;
std::swap(FailedCmd->MUsers, EmptyCmd->MUsers);

FailedCmd->getEvent()->setCommand(EmptyCmd);
assert(FailedCmd->MLeafCounter == 0);
delete FailedCmd;
}

void Scheduler::GraphBuilder::removeRecordForMemObj(SYCLMemObjI *MemObject) {
const auto It = std::find_if(
MMemObjs.begin(), MMemObjs.end(),
Expand Down
1 change: 1 addition & 0 deletions sycl/source/detail/scheduler/leaves_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ static inline bool doOverlap(const Requirement *LHS, const Requirement *RHS) {

static inline bool isHostAccessorCmd(Command *Cmd) {
return Cmd->getType() == Command::EMPTY_TASK &&
Cmd->MEnqueueStatus == EnqueueResultT::SyclEnqueueBlocked &&
Cmd->MBlockReason == Command::BlockReason::HostAccessor;
}

Expand Down
77 changes: 35 additions & 42 deletions sycl/source/detail/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ void Scheduler::waitForRecordToFinish(MemObjRecord *Record,
}
}

static void deallocateStreams(
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
// Deallocate buffers for stream objects of the finished commands. Iterate in
// reverse order because it is the order of commands execution.
for (auto StreamImplPtr = StreamsToDeallocate.rbegin();
StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr)
detail::Scheduler::getInstance().deallocateStreamBuffers(
StreamImplPtr->get());
}

EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
QueueImplPtr Queue) {
EventImplPtr NewEvent = nullptr;
Expand Down Expand Up @@ -111,58 +121,51 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
}

std::vector<Command *> ToCleanUp;
{
try {
ReadLockT Lock(MGraphLock);

Command *NewCmd = static_cast<Command *>(NewEvent->getCommand());

EnqueueResultT Res;
bool Enqueued;

auto CleanUp = [&]() {
if (NewCmd && (NewCmd->MDeps.size() == 0 && NewCmd->MUsers.size() == 0)) {
if (Type == CG::RunOnHostIntel)
static_cast<ExecCGCommand *>(NewCmd)->releaseCG();

NewEvent->setCommand(nullptr);
delete NewCmd;
}
};

for (Command *Cmd : AuxiliaryCmds) {
Enqueued = GraphProcessor::enqueueCommand(Cmd, Res, ToCleanUp);
try {
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Auxiliary enqueue process failed.",
PI_INVALID_OPERATION);
} catch (...) {
// enqueueCommand() func and if statement above may throw an exception,
// so destroy required resources to avoid memory leak
CleanUp();
std::rethrow_exception(std::current_exception());
}
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Auxiliary enqueue process failed.",
PI_INVALID_OPERATION);
}

if (NewCmd) {
// TODO: Check if lazy mode.
EnqueueResultT Res;
try {
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Enqueue process failed.", PI_INVALID_OPERATION);
} catch (...) {
// enqueueCommand() func and if statement above may throw an exception,
// so destroy required resources to avoid memory leak
CleanUp();
std::rethrow_exception(std::current_exception());
}
bool Enqueued = GraphProcessor::enqueueCommand(NewCmd, Res, ToCleanUp);
if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult)
throw runtime_error("Enqueue process failed.", PI_INVALID_OPERATION);

// If there are no memory dependencies decouple and free the command.
// Though, dismiss ownership of native kernel command group as it's
// resources may be in use by backend and synchronization point here is
// at native kernel execution finish.
CleanUp();
if (NewCmd && (NewCmd->MDeps.size() == 0 && NewCmd->MUsers.size() == 0)) {
if (Type == CG::RunOnHostIntel)
static_cast<ExecCGCommand *>(NewCmd)->releaseCG();

NewEvent->setCommand(nullptr);
delete NewCmd;
}
}
} catch (...) {
std::vector<StreamImplPtr> StreamsToDeallocate;
Command *NewCmd = static_cast<Command *>(NewEvent->getCommand());
if (NewCmd) {
WriteLockT Lock(MGraphLock, std::defer_lock);
MGraphBuilder.cleanupFailedCommand(NewCmd, StreamsToDeallocate,
ToCleanUp);
}
deallocateStreams(StreamsToDeallocate);
cleanupCommands(ToCleanUp);
std::rethrow_exception(std::current_exception());
}
cleanupCommands(ToCleanUp);

Expand Down Expand Up @@ -223,16 +226,6 @@ void Scheduler::waitForEvent(EventImplPtr Event) {
cleanupCommands(ToCleanUp);
}

static void deallocateStreams(
std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
// Deallocate buffers for stream objects of the finished commands. Iterate in
// reverse order because it is the order of commands execution.
for (auto StreamImplPtr = StreamsToDeallocate.rbegin();
StreamImplPtr != StreamsToDeallocate.rend(); ++StreamImplPtr)
detail::Scheduler::getInstance().deallocateStreamBuffers(
StreamImplPtr->get());
}

void Scheduler::cleanupFinishedCommands(EventImplPtr FinishedEvent) {
// We are going to traverse a graph of finished commands. Gather stream
// objects from these commands if any and deallocate buffers for these stream
Expand Down
9 changes: 9 additions & 0 deletions sycl/source/detail/scheduler/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,13 @@ class Scheduler {
Command *FinishedCmd,
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>> &);

/// Replaces a failed command in the subgraph with an empty command and
/// deletes the failed command.
void cleanupFailedCommand(
Command *FailedCmd,
std::vector<std::shared_ptr<cl::sycl::detail::stream_impl>> &,
std::vector<Command *> &ToCleanUp);

/// Reschedules the command passed using Queue provided.
///
/// This can lead to rescheduling of all dependent commands. This can be
Expand Down Expand Up @@ -551,6 +558,8 @@ class Scheduler {
std::vector<Command *> &ToEnqueue);

/// Removes commands from leaves.
void updateLeaves(const std::set<Command *> &Cmds, MemObjRecord *Record,
std::vector<Command *> &ToCleanUp);
void updateLeaves(const std::set<Command *> &Cmds, MemObjRecord *Record,
access::mode AccessMode,
std::vector<Command *> &ToCleanUp);
Expand Down
Loading