Skip to content

Commit

Permalink
integrate cuda
Browse files Browse the repository at this point in the history
  • Loading branch information
twhuang-utah committed Feb 29, 2020
1 parent 40a4ebc commit 13fef33
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 115 deletions.
49 changes: 46 additions & 3 deletions examples/saxpy.cu
Original file line number Diff line number Diff line change
@@ -1,12 +1,55 @@
#include <taskflow/cudaflow.hpp>
#include <taskflow/taskflow.hpp>

// Kernel: saxpy
__global__ void saxpy(int n, float a, float *x, float *y) {
int i = blockIdx.x*blockDim.x + threadIdx.x;
if (i < n) {
y[i] = a*x[i] + y[i];
}
}

// Function: main
int main() {

const unsigned N = 1<<20;

return 0;
}
tf::Taskflow taskflow;
tf::Executor executor;

std::vector<float> hx, hy;

float* dx {nullptr};
float* dy {nullptr};

// allocate x
auto allocate_x = taskflow.emplace([&]() {
hx.resize(N, 1.0f);
cudaMalloc(&dx, N*sizeof(float));
}).name("allocate_x");

// allocate y
auto allocate_y = taskflow.emplace([&]() {
hy.resize(N, 2.0f);
cudaMalloc(&dy, N*sizeof(float));
}).name("allocate_y");

// saxpy
auto cudaflow = taskflow.emplace([&](tf::cudaFlow& cf) {
auto h2d_x = cf.copy(dx, hx.data(), N);
auto h2d_y = cf.copy(dy, hy.data(), N);
auto d2h_x = cf.copy(hx.data(), dx, N);
auto d2h_y = cf.copy(hy.data(), dy, N);
auto kernel = cf.kernel(
{(N+255)/256, 1, 1}, {256, 1, 1}, 0, saxpy, N, 2.0f, dx, dy
);
kernel.succeed(h2d_x, h2d_y)
.precede(d2h_x, d2h_y);
});

cudaflow.succeed(allocate_x, allocate_y);

executor.run(taskflow).wait();

return 0;
}

218 changes: 140 additions & 78 deletions taskflow/core/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ class Executor {
void _invoke_static_work(Worker&, Node*);
void _invoke_dynamic_work(Worker&, Node*, Subflow&);
void _invoke_condition_work(Worker&, Node*, int&);

#ifdef TF_ENABLE_CUDA
void _invoke_cudaflow_work(Worker&, Node*);
void _invoke_cudaflow_work_impl(Worker&, Node*);
#endif

void _set_up_module_work(Node*, bool&);
void _set_up_topology(Topology*);
void _tear_down_topology(Topology**);
Expand Down Expand Up @@ -621,99 +627,117 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
// Here we need to fetch the num_successors first to avoid the invalid memory
// access caused by topology clear.
const auto num_successors = node->num_successors();

// static task
if(node->_handle.index() == Node::STATIC_WORK) {
_invoke_static_work(worker, node);
}
// module task
else if(node->_handle.index() == Node::MODULE_WORK) {
bool first_time = !node->_has_state(Node::SPAWNED);
bool emptiness = false;
_set_up_module_work(node, emptiness);
if(first_time && !emptiness) {
return;

// switch is faster than nested if-else due to jump table
switch(node->_handle.index()) {
// static task
case Node::STATIC_WORK:{
_invoke_static_work(worker, node);
}
break;
// module task
case Node::MODULE_WORK: {
bool first_time = !node->_has_state(Node::SPAWNED);
bool emptiness = false;
_set_up_module_work(node, emptiness);
if(first_time && !emptiness) {
return;
}
}
}
// dynamic task
else if (node->_handle.index() == Node::DYNAMIC_WORK) {
break;
// dynamic task
case Node::DYNAMIC_WORK: {

auto& subgraph = nstd::get<Node::DynamicWork>(node->_handle).subgraph;
auto& subgraph = nstd::get<Node::DynamicWork>(node->_handle).subgraph;

// Clear the subgraph before the task execution
if(!node->_has_state(Node::SPAWNED)) {
subgraph.clear();
}

Subflow fb(subgraph);
// Clear the subgraph before the task execution
if(!node->_has_state(Node::SPAWNED)) {
subgraph.clear();
}
Subflow fb(subgraph);

_invoke_dynamic_work(worker, node, fb);

// Need to create a subflow if first time & subgraph is not empty
if(!node->_has_state(Node::SPAWNED)) {
node->_set_state(Node::SPAWNED);
if(!subgraph.empty()) {
// For storing the source nodes
PassiveVector<Node*> src;

for(auto n : subgraph._nodes) {

n->_topology = node->_topology;
n->_set_up_join_counter();

if(!fb.detached()) {
n->_parent = node;
}
_invoke_dynamic_work(worker, node, fb);
// Need to create a subflow if first time & subgraph is not empty
if(!node->_has_state(Node::SPAWNED)) {
node->_set_state(Node::SPAWNED);
if(!subgraph.empty()) {
// For storing the source nodes
PassiveVector<Node*> src;

for(auto n : subgraph._nodes) {

n->_topology = node->_topology;
n->_set_up_join_counter();
if(!fb.detached()) {
n->_parent = node;
}

if(n->num_dependents() == 0) {
src.push_back(n);
if(n->num_dependents() == 0) {
src.push_back(n);
}
}
}

const bool join = fb.joined();
if(!join) {
// Detach mode
node->_topology->_join_counter.fetch_add(src.size());
}
else {
// Join mode
node->_join_counter.fetch_add(src.size());

// spawned node needs another second-round execution
if(node->_parent == nullptr) {
node->_topology->_join_counter.fetch_add(1);
const bool join = fb.joined();
if(!join) {
// Detach mode
node->_topology->_join_counter.fetch_add(src.size());
}
else {
node->_parent->_join_counter.fetch_add(1);
// Join mode
node->_join_counter.fetch_add(src.size());

// spawned node needs another second-round execution
if(node->_parent == nullptr) {
node->_topology->_join_counter.fetch_add(1);
}
else {
node->_parent->_join_counter.fetch_add(1);
}
}
}

_schedule(src);
_schedule(src);

if(join) {
return;
}
} // End of first time
if(join) {
return;
}
} // End of first time
}
}
}
// condition task
else if(node->_handle.index() == Node::CONDITION_WORK) {
break;
// condition task
case Node::CONDITION_WORK: {

if(node->_has_state(Node::BRANCH)) {
node->_join_counter = node->num_strong_dependents();
}
else {
node->_join_counter = node->num_dependents();
}

int id;
_invoke_condition_work(worker, node, id);
if(node->_has_state(Node::BRANCH)) {
node->_join_counter = node->num_strong_dependents();
}
else {
node->_join_counter = node->num_dependents();
}

int id;
_invoke_condition_work(worker, node, id);

if(id >= 0 && static_cast<size_t>(id) < num_successors) {
node->_successors[id]->_join_counter.store(0);
_schedule(node->_successors[id], true);
}
return ;
} // no need to add a break here due to the immediate return

if(id >= 0 && static_cast<size_t>(id) < num_successors) {
node->_successors[id]->_join_counter.store(0);
_schedule(node->_successors[id], true);
// cudaflow task
#ifdef TF_ENABLE_CUDA
case Node::CUDAFLOW_WORK: {
_invoke_cudaflow_work(worker, node);
}
return ;
break;
#endif

// monostate
default:
break;
}


Expand All @@ -734,7 +758,8 @@ inline void Executor::_invoke(Worker& worker, Node* node) {
Node* cache {nullptr};
size_t num_spawns {0};

auto& c = (node->_parent) ? node->_parent->_join_counter : node->_topology->_join_counter;
auto& c = (node->_parent) ? node->_parent->_join_counter :
node->_topology->_join_counter;

for(size_t i=0; i<num_successors; ++i) {
if(--(node->_successors[i]->_join_counter) == 0) {
Expand Down Expand Up @@ -794,6 +819,43 @@ inline void Executor::_invoke_condition_work(Worker& worker, Node* node, int& id
}
}

#ifdef TF_ENABLE_CUDA
// Procedure: _invoke_cudaflow_work
inline void Executor::_invoke_cudaflow_work(Worker& worker, Node* node) {
if(_observer) {
_observer->on_entry(worker.id, TaskView(node));
_invoke_cudaflow_work_impl(worker, node);
_observer->on_exit(worker.id, TaskView(node));
}
else {
_invoke_cudaflow_work_impl(worker, node);
}
}

// Procedure: _invoke_cudaflow_work_impl
inline void Executor::_invoke_cudaflow_work_impl(Worker&, Node* node) {

auto& h = nstd::get<Node::cudaFlowWork>(node->_handle);

h.graph.clear();

cudaFlow cf(h.graph);

h.work(cf);

cudaGraphExec_t exec;
TF_CHECK_CUDA(
cudaGraphInstantiate(&exec, h.graph._handle, nullptr, nullptr, 0),
"failed to create an exec cudaGraph"
);
TF_CHECK_CUDA(cudaGraphLaunch(exec, 0), "failed to launch cudaGraph")
TF_CHECK_CUDA(cudaStreamSynchronize(0), "failed to sync cudaStream");
TF_CHECK_CUDA(
cudaGraphExecDestroy(exec), "failed to destroy an exec cudaGraph"
);
}
#endif

// Procedure: _set_up_module_work
inline void Executor::_set_up_module_work(Node* node, bool& ept) {

Expand Down
28 changes: 27 additions & 1 deletion taskflow/core/flow_builder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ class FlowBuilder {
template <typename C>
std::enable_if_t<is_condition_task_v<C>, Task> emplace(C&& callable);

#ifdef TF_ENABLE_CUDA
/**
@brief creates a cudaflow task from a given callable object
@tparam C callable type
@param callable a callable object acceptable to std::function<void(cudaFlow&)>
@return Task handle
*/
template <typename C>
std::enable_if_t<is_cudaflow_task_v<C>, Task> emplace(C&& callable);
#endif

/**
@brief creates multiple tasks from a list of callable objects at one time
Expand Down Expand Up @@ -437,7 +451,7 @@ std::enable_if_t<is_dynamic_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
}

// Function: emplace
// emplades a condition task
// emplaces a condition task
template <typename C>
std::enable_if_t<is_condition_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
auto n = _graph.emplace_back(
Expand All @@ -446,6 +460,18 @@ std::enable_if_t<is_condition_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
return Task(n);
}

#ifdef TF_ENABLE_CUDA
// Function: emplace
// emplaces a cudaflow task
template <typename C>
std::enable_if_t<is_cudaflow_task_v<C>, Task> FlowBuilder::emplace(C&& c) {
auto n = _graph.emplace_back(
nstd::in_place_type_t<Node::cudaFlowWork>{}, std::forward<C>(c)
);
return Task(n);
}
#endif

//template <typename C>
//Task FlowBuilder::emplace(C&& c) {
//
Expand Down
Loading

0 comments on commit 13fef33

Please sign in to comment.