-
Notifications
You must be signed in to change notification settings - Fork 0
/
clUtilParallelFor.cc
136 lines (106 loc) · 4.2 KB
/
clUtilParallelFor.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#include "clUtilParallelFor.h"
#include <math.h>
#define STRINGIFY(arg) #arg
#define __WHERE__ __FILE__ ":" STRINGIFY(__LINE__)
using namespace std;
using namespace clUtil;
using namespace clUtil::Utility;
bool DeviceGroupInfo::singletonInitialized = false;
DeviceGroupInfo DeviceGroupInfo::deviceGroupInfoSingleton;
void clUtil::ParallelFor(const size_t start,
const size_t stride,
const size_t end,
function<void (size_t, size_t)> loopBody,
IScheduler&& model)
{
size_t oldDeviceNum = Device::GetCurrentDeviceNum();
size_t iterationsRemaining = end - start + 1;
IndexRange range;
range.Start = start;
range.End = end;
model.setRange(range);
//Initialize device statuses
vector<DeviceStatus> deviceStatuses(Device::GetDevices().size());
for(size_t curDeviceID = 0;
curDeviceID < deviceStatuses.size();
curDeviceID++)
{
deviceStatuses[curDeviceID].DeviceID = curDeviceID;
}
//Parallel for scheduling loop
while(iterationsRemaining > 0)
{
for(size_t curDeviceID = 0;
curDeviceID < Device::GetDevices().size();
curDeviceID++)
{
DeviceStatus& curDeviceStatus = deviceStatuses[curDeviceID];
size_t deviceGroup = DeviceGroupInfo::Get()[curDeviceID];
//If this device isn't busy, get some work from the model and run it
if(curDeviceStatus.IsBusy == false &&
model.workRemains(deviceGroup) == true)
{
IndexRange work;
work = model.getWork(deviceGroup);
Device::SetCurrentDevice(curDeviceID);
Device& curDevice = Device::GetCurrentDevice();
curDeviceStatus.Range = work;
curDeviceStatus.Time1 = getTime();
curDeviceStatus.IsBusy = true;
loopBody(work.Start, work.End);
//We indicate this device is finished by enqueueing markers into
//every queues, then enqueueing a waitForEvents which depends on the
//markers. Then enqueue one more marker so we can capture its event
size_t prevCommandQueue = curDevice.getCommandQueueID();
unique_ptr<cl_event[]>
markerList(new cl_event[curDevice.getNumCommandQueues()]);
for(size_t curQueueID = 0;
curQueueID < curDevice.getNumCommandQueues();
curQueueID++)
{
curDevice.setCommandQueue(curQueueID);
clEnqueueMarker(curDevice.getCommandQueue(),
&markerList[curQueueID]);
}
curDevice.setCommandQueue(prevCommandQueue);
clEnqueueWaitForEvents(curDevice.getCommandQueue(),
curDevice.getNumCommandQueues(),
markerList.get());
clEnqueueMarker(curDevice.getCommandQueue(),
&curDeviceStatus.WaitEvent);
//Release the horses from the gates
curDevice.flush();
}
if(curDeviceStatus.WaitEvent != NULL) //If device has a valid event...
{
//Poll the event for completion
cl_int eventStatus;
cl_int err = clGetEventInfo(curDeviceStatus.WaitEvent,
CL_EVENT_COMMAND_EXECUTION_STATUS,
sizeof(eventStatus),
&eventStatus,
NULL);
if(err != CL_SUCCESS)
{
throw clUtilException("ParallelFor internal error: could not get "
"event info " __WHERE__ "\n");
}
//If done, mark this device as available, notify the model, and
//reset this device for more work
if(eventStatus == CL_COMPLETE)
{
curDeviceStatus.Time2 = getTime();
model.updateModel(curDeviceStatus);
curDeviceStatus.IsBusy = false;
clReleaseEvent(curDeviceStatus.WaitEvent);
curDeviceStatus.WaitEvent = NULL;
iterationsRemaining -= curDeviceStatus.Range.End -
curDeviceStatus.Range.Start +
1;
//cout << "Iterations remaining " << iterationsRemaining << endl;
}
}
}
}
Device::SetCurrentDevice(oldDeviceNum);
}