@@ -70,16 +70,6 @@ void Scheduler::waitForRecordToFinish(MemObjRecord *Record,
70
70
}
71
71
}
72
72
73
- static void deallocateStreams (
74
- std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
75
- // Deallocate buffers for stream objects of the finished commands. Iterate in
76
- // reverse order because it is the order of commands execution.
77
- for (auto StreamImplPtr = StreamsToDeallocate.rbegin ();
78
- StreamImplPtr != StreamsToDeallocate.rend (); ++StreamImplPtr)
79
- detail::Scheduler::getInstance ().deallocateStreamBuffers (
80
- StreamImplPtr->get ());
81
- }
82
-
83
73
EventImplPtr Scheduler::addCG (std::unique_ptr<detail::CG> CommandGroup,
84
74
QueueImplPtr Queue) {
85
75
EventImplPtr NewEvent = nullptr ;
@@ -121,51 +111,58 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
121
111
}
122
112
123
113
std::vector<Command *> ToCleanUp;
124
- try {
114
+ {
125
115
ReadLockT Lock (MGraphLock);
126
116
127
117
Command *NewCmd = static_cast <Command *>(NewEvent->getCommand ());
128
118
129
119
EnqueueResultT Res;
130
120
bool Enqueued;
131
121
122
+ auto CleanUp = [&]() {
123
+ if (NewCmd && (NewCmd->MDeps .size () == 0 && NewCmd->MUsers .size () == 0 )) {
124
+ if (Type == CG::RunOnHostIntel)
125
+ static_cast <ExecCGCommand *>(NewCmd)->releaseCG ();
126
+
127
+ NewEvent->setCommand (nullptr );
128
+ delete NewCmd;
129
+ }
130
+ };
131
+
132
132
for (Command *Cmd : AuxiliaryCmds) {
133
133
Enqueued = GraphProcessor::enqueueCommand (Cmd, Res, ToCleanUp);
134
- if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult )
135
- throw runtime_error (" Auxiliary enqueue process failed." ,
136
- PI_INVALID_OPERATION);
134
+ try {
135
+ if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult )
136
+ throw runtime_error (" Auxiliary enqueue process failed." ,
137
+ PI_INVALID_OPERATION);
138
+ } catch (...) {
139
+ // enqueueCommand() func and if statement above may throw an exception,
140
+ // so destroy required resources to avoid memory leak
141
+ CleanUp ();
142
+ std::rethrow_exception (std::current_exception ());
143
+ }
137
144
}
138
145
139
146
if (NewCmd) {
140
147
// TODO: Check if lazy mode.
141
148
EnqueueResultT Res;
142
- bool Enqueued = GraphProcessor::enqueueCommand (NewCmd, Res, ToCleanUp);
143
- if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult )
144
- throw runtime_error (" Enqueue process failed." , PI_INVALID_OPERATION);
149
+ try {
150
+ bool Enqueued = GraphProcessor::enqueueCommand (NewCmd, Res, ToCleanUp);
151
+ if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult )
152
+ throw runtime_error (" Enqueue process failed." , PI_INVALID_OPERATION);
153
+ } catch (...) {
154
+ // enqueueCommand() func and if statement above may throw an exception,
155
+ // so destroy required resources to avoid memory leak
156
+ CleanUp ();
157
+ std::rethrow_exception (std::current_exception ());
158
+ }
145
159
146
160
// If there are no memory dependencies decouple and free the command.
147
161
// Though, dismiss ownership of native kernel command group as it's
148
162
// resources may be in use by backend and synchronization point here is
149
163
// at native kernel execution finish.
150
- if (NewCmd && (NewCmd->MDeps .size () == 0 && NewCmd->MUsers .size () == 0 )) {
151
- if (Type == CG::RunOnHostIntel)
152
- static_cast <ExecCGCommand *>(NewCmd)->releaseCG ();
153
-
154
- NewEvent->setCommand (nullptr );
155
- delete NewCmd;
156
- }
164
+ CleanUp ();
157
165
}
158
- } catch (...) {
159
- std::vector<StreamImplPtr> StreamsToDeallocate;
160
- Command *NewCmd = static_cast <Command *>(NewEvent->getCommand ());
161
- if (NewCmd) {
162
- WriteLockT Lock (MGraphLock, std::defer_lock);
163
- MGraphBuilder.cleanupFailedCommand (NewCmd, StreamsToDeallocate,
164
- ToCleanUp);
165
- }
166
- deallocateStreams (StreamsToDeallocate);
167
- cleanupCommands (ToCleanUp);
168
- std::rethrow_exception (std::current_exception ());
169
166
}
170
167
cleanupCommands (ToCleanUp);
171
168
@@ -226,6 +223,16 @@ void Scheduler::waitForEvent(EventImplPtr Event) {
226
223
cleanupCommands (ToCleanUp);
227
224
}
228
225
226
+ static void deallocateStreams (
227
+ std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
228
+ // Deallocate buffers for stream objects of the finished commands. Iterate in
229
+ // reverse order because it is the order of commands execution.
230
+ for (auto StreamImplPtr = StreamsToDeallocate.rbegin ();
231
+ StreamImplPtr != StreamsToDeallocate.rend (); ++StreamImplPtr)
232
+ detail::Scheduler::getInstance ().deallocateStreamBuffers (
233
+ StreamImplPtr->get ());
234
+ }
235
+
229
236
void Scheduler::cleanupFinishedCommands (EventImplPtr FinishedEvent) {
230
237
// We are going to traverse a graph of finished commands. Gather stream
231
238
// objects from these commands if any and deallocate buffers for these stream
0 commit comments