@@ -70,6 +70,16 @@ void Scheduler::waitForRecordToFinish(MemObjRecord *Record,
70
70
}
71
71
}
72
72
73
+ static void deallocateStreams (
74
+ std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
75
+ // Deallocate buffers for stream objects of the finished commands. Iterate in
76
+ // reverse order because it is the order of commands execution.
77
+ for (auto StreamImplPtr = StreamsToDeallocate.rbegin ();
78
+ StreamImplPtr != StreamsToDeallocate.rend (); ++StreamImplPtr)
79
+ detail::Scheduler::getInstance ().deallocateStreamBuffers (
80
+ StreamImplPtr->get ());
81
+ }
82
+
73
83
EventImplPtr Scheduler::addCG (std::unique_ptr<detail::CG> CommandGroup,
74
84
QueueImplPtr Queue) {
75
85
EventImplPtr NewEvent = nullptr ;
@@ -111,58 +121,51 @@ EventImplPtr Scheduler::addCG(std::unique_ptr<detail::CG> CommandGroup,
111
121
}
112
122
113
123
std::vector<Command *> ToCleanUp;
114
- {
124
+ try {
115
125
ReadLockT Lock (MGraphLock);
116
126
117
127
Command *NewCmd = static_cast <Command *>(NewEvent->getCommand ());
118
128
119
129
EnqueueResultT Res;
120
130
bool Enqueued;
121
131
122
- auto CleanUp = [&]() {
123
- if (NewCmd && (NewCmd->MDeps .size () == 0 && NewCmd->MUsers .size () == 0 )) {
124
- if (Type == CG::RunOnHostIntel)
125
- static_cast <ExecCGCommand *>(NewCmd)->releaseCG ();
126
-
127
- NewEvent->setCommand (nullptr );
128
- delete NewCmd;
129
- }
130
- };
131
-
132
132
for (Command *Cmd : AuxiliaryCmds) {
133
133
Enqueued = GraphProcessor::enqueueCommand (Cmd, Res, ToCleanUp);
134
- try {
135
- if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult )
136
- throw runtime_error (" Auxiliary enqueue process failed." ,
137
- PI_INVALID_OPERATION);
138
- } catch (...) {
139
- // enqueueCommand() func and if statement above may throw an exception,
140
- // so destroy required resources to avoid memory leak
141
- CleanUp ();
142
- std::rethrow_exception (std::current_exception ());
143
- }
134
+ if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult )
135
+ throw runtime_error (" Auxiliary enqueue process failed." ,
136
+ PI_INVALID_OPERATION);
144
137
}
145
138
146
139
if (NewCmd) {
147
140
// TODO: Check if lazy mode.
148
141
EnqueueResultT Res;
149
- try {
150
- bool Enqueued = GraphProcessor::enqueueCommand (NewCmd, Res, ToCleanUp);
151
- if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult )
152
- throw runtime_error (" Enqueue process failed." , PI_INVALID_OPERATION);
153
- } catch (...) {
154
- // enqueueCommand() func and if statement above may throw an exception,
155
- // so destroy required resources to avoid memory leak
156
- CleanUp ();
157
- std::rethrow_exception (std::current_exception ());
158
- }
142
+ bool Enqueued = GraphProcessor::enqueueCommand (NewCmd, Res, ToCleanUp);
143
+ if (!Enqueued && EnqueueResultT::SyclEnqueueFailed == Res.MResult )
144
+ throw runtime_error (" Enqueue process failed." , PI_INVALID_OPERATION);
159
145
160
146
// If there are no memory dependencies decouple and free the command.
161
147
// Though, dismiss ownership of native kernel command group as it's
162
148
// resources may be in use by backend and synchronization point here is
163
149
// at native kernel execution finish.
164
- CleanUp ();
150
+ if (NewCmd && (NewCmd->MDeps .size () == 0 && NewCmd->MUsers .size () == 0 )) {
151
+ if (Type == CG::RunOnHostIntel)
152
+ static_cast <ExecCGCommand *>(NewCmd)->releaseCG ();
153
+
154
+ NewEvent->setCommand (nullptr );
155
+ delete NewCmd;
156
+ }
165
157
}
158
+ } catch (...) {
159
+ std::vector<StreamImplPtr> StreamsToDeallocate;
160
+ Command *NewCmd = static_cast <Command *>(NewEvent->getCommand ());
161
+ if (NewCmd) {
162
+ WriteLockT Lock (MGraphLock, std::defer_lock);
163
+ MGraphBuilder.cleanupFailedCommand (NewCmd, StreamsToDeallocate,
164
+ ToCleanUp);
165
+ }
166
+ deallocateStreams (StreamsToDeallocate);
167
+ cleanupCommands (ToCleanUp);
168
+ std::rethrow_exception (std::current_exception ());
166
169
}
167
170
cleanupCommands (ToCleanUp);
168
171
@@ -223,16 +226,6 @@ void Scheduler::waitForEvent(EventImplPtr Event) {
223
226
cleanupCommands (ToCleanUp);
224
227
}
225
228
226
- static void deallocateStreams (
227
- std::vector<std::shared_ptr<stream_impl>> &StreamsToDeallocate) {
228
- // Deallocate buffers for stream objects of the finished commands. Iterate in
229
- // reverse order because it is the order of commands execution.
230
- for (auto StreamImplPtr = StreamsToDeallocate.rbegin ();
231
- StreamImplPtr != StreamsToDeallocate.rend (); ++StreamImplPtr)
232
- detail::Scheduler::getInstance ().deallocateStreamBuffers (
233
- StreamImplPtr->get ());
234
- }
235
-
236
229
void Scheduler::cleanupFinishedCommands (EventImplPtr FinishedEvent) {
237
230
// We are going to traverse a graph of finished commands. Gather stream
238
231
// objects from these commands if any and deallocate buffers for these stream
0 commit comments