18
18
#include " llvm/Support/Path.h"
19
19
#include " llvm/Support/Program.h"
20
20
#include " llvm/Support/SystemUtils.h"
21
+ #include " llvm/Support/Threading.h"
21
22
23
+ #include < list>
22
24
#include < vector>
23
25
24
26
using namespace llvm ;
@@ -69,6 +71,17 @@ static cl::opt<std::string> OutIncrement{
69
71
" pass." ),
70
72
cl::init (" " ), cl::value_desc (" R" )};
71
73
74
+ static cl::opt<unsigned int > JobsInParallel{
75
+ " jobs" ,
76
+ cl::Optional,
77
+ cl::init (1 ),
78
+ cl::desc (" Specify the number of threads for launching input commands in "
79
+ " parallel mode" ),
80
+ };
81
+
82
+ static cl::alias JobsInParallelShort{" j" , cl::desc (" Alias for --jobs" ),
83
+ cl::aliasopt (JobsInParallel)};
84
+
72
85
static void error (const Twine &Msg) {
73
86
errs () << " llvm-foreach: " << Msg << ' \n ' ;
74
87
exit (1 );
@@ -79,6 +92,32 @@ static void error(std::error_code EC, const Twine &Prefix) {
79
92
error (Prefix + " : " + EC.message ());
80
93
}
81
94
95
+ // With BlockingWait=false this function just goes through the all
96
+ // submitted jobs to check if some of them have finished.
97
+ int checkIfJobsAreFinished (std::list<sys::ProcessInfo> &JobsSubmitted,
98
+ bool BlockingWait = true ) {
99
+ std::string ErrMsg;
100
+ auto It = JobsSubmitted.begin ();
101
+ while (It != JobsSubmitted.end ()) {
102
+ sys::ProcessInfo WaitResult =
103
+ sys::Wait (*It, 0 , /* WaitUntilTerminates*/ BlockingWait, &ErrMsg);
104
+
105
+ // Check if the job has finished (PID will be 0 if it's not).
106
+ if (!BlockingWait && !WaitResult.Pid ) {
107
+ It++;
108
+ continue ;
109
+ }
110
+ assert (BlockingWait || WaitResult.Pid );
111
+ It = JobsSubmitted.erase (It);
112
+
113
+ if (WaitResult.ReturnCode != 0 ) {
114
+ errs () << " llvm-foreach: " << ErrMsg << ' \n ' ;
115
+ return WaitResult.ReturnCode ;
116
+ }
117
+ }
118
+ return 0 ;
119
+ }
120
+
82
121
int main (int argc, char **argv) {
83
122
cl::ParseCommandLineOptions (
84
123
argc, argv,
@@ -160,6 +199,16 @@ int main(int argc, char **argv) {
160
199
PrevNumOfLines = FileLists[i].size ();
161
200
}
162
201
202
+ if (!JobsInParallel)
203
+ error (" Number of parallel threads should be a positive integer" );
204
+
205
+ size_t MaxSafeNumThreads = optimal_concurrency ().compute_thread_count ();
206
+ if (JobsInParallel > MaxSafeNumThreads) {
207
+ JobsInParallel = MaxSafeNumThreads;
208
+ outs () << " llvm-foreach: adjusted number of threads to "
209
+ << MaxSafeNumThreads << " (max safe available).\n " ;
210
+ }
211
+
163
212
std::error_code EC;
164
213
raw_fd_ostream OS{OutputFileList, EC, sys::fs::OpenFlags::OF_None};
165
214
if (!OutputFileList.empty ())
@@ -170,6 +219,7 @@ int main(int argc, char **argv) {
170
219
std::string IncOutArg;
171
220
std::vector<std::string> ResInArgs (InReplaceArgs.size ());
172
221
std::string ResFileList = " " ;
222
+ std::list<sys::ProcessInfo> JobsSubmitted;
173
223
for (size_t j = 0 ; j != FileLists[0 ].size (); ++j) {
174
224
for (size_t i = 0 ; i < InReplaceArgs.size (); ++i) {
175
225
ArgumentReplace CurReplace = InReplaceArgs[i];
@@ -221,17 +271,23 @@ int main(int argc, char **argv) {
221
271
Args[OutIncrementArg.ArgNum ] = IncOutArg;
222
272
}
223
273
224
- std::string ErrMsg;
225
- // TODO: Add possibility to execute commands in parallel .
226
- int Result =
227
- sys::ExecuteAndWait (Prog, Args, /* Env= */ None, /* Redirects= */ None,
228
- /* SecondsToWait= */ 0 , /* MemoryLimit= */ 0 , &ErrMsg);
229
- if (Result != 0 ) {
230
- errs () << " llvm-foreach: " << ErrMsg << ' \n ' ;
231
- Res = Result;
232
- }
274
+ // Do not start execution of a new job until previous one(s) are finished,
275
+ // if the maximum number of parallel workers is reached .
276
+ while (JobsSubmitted. size () == JobsInParallel)
277
+ if ( int Result =
278
+ checkIfJobsAreFinished (JobsSubmitted , /* BlockingWait */ false ))
279
+ Res = Result;
280
+
281
+ JobsSubmitted. emplace_back ( sys::ExecuteNoWait (
282
+ Prog, Args, /* Env= */ None, /* Redirects= */ None, /* MemoryLimit= */ 0 ));
233
283
}
234
284
285
+ // Wait for all commands to be executed.
286
+ while (!JobsSubmitted.empty ())
287
+ if (int Result =
288
+ checkIfJobsAreFinished (JobsSubmitted, /* BlockingWait*/ true ))
289
+ Res = Result;
290
+
235
291
if (!OutputFileList.empty ()) {
236
292
OS.close ();
237
293
}
0 commit comments