Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 31 additions & 30 deletions include/graybat/communicationPolicy/bmpi/Event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,47 @@
#include <boost/mpi/environment.hpp>

namespace graybat {

namespace communicationPolicy {

namespace bmpi {


/**
* @brief An event is returned by non-blocking
* communication operations and can be
* asked whether an operation has finished
* or it can be waited for this operation to
* be finished.
*
*/
class Event {
typedef unsigned Tag;
/**
* @brief An event is returned by non-blocking
* communication operations and can be
* asked whether an operation has finished
* or it can be waited for this operation to
* be finished.
*
*/
class Event {
typedef unsigned Tag;
typedef unsigned VAddr;

public:
Event(boost::mpi::request request) : request(request), async(true){

}
public:
Event(boost::mpi::request request) : request(request), async(true){

}

Event(boost::mpi::status status) : status(status), async(false){

}

Event& operator=(const Event&) = default;

~Event(){
~Event(){

}
}

void wait(){
void wait(){
if(async){
request.wait();
}

}

bool ready(){
}

bool ready(){
if(async){
boost::optional<boost::mpi::status> status = request.test();

Expand All @@ -75,7 +76,7 @@ namespace graybat {
}
return true;

}
}

VAddr source(){
if(async){
Expand All @@ -92,18 +93,18 @@ namespace graybat {

}

private:
boost::mpi::request request;
private:
boost::mpi::request request;
boost::mpi::status status;
const bool async;
bool async;





};
};

} // namespace bmpi

} // namespace communicationPolicy

} // namespace graybat

58 changes: 30 additions & 28 deletions include/graybat/communicationPolicy/zmq/Event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,17 @@ namespace graybat {

namespace zmq {

/**
* @brief An event is returned by non-blocking
* communication operations and can be
* asked whether an operation has finished
* or it can be waited for this operation to
* be finished.
*
*/
/**
* @brief An event is returned by non-blocking
* communication operations and can be
* asked whether an operation has finished
* or it can be waited for this operation to
* be finished.
*
*/
template <typename T_CP>
class Event {
public:
class Event {
public:

using ContextID = typename graybat::communicationPolicy::ContextID<T_CP>;
using VAddr = typename graybat::communicationPolicy::VAddr<T_CP>;
Expand All @@ -51,35 +51,37 @@ namespace graybat {
using MsgID = typename graybat::communicationPolicy::MsgID<T_CP>;
using Context = typename graybat::communicationPolicy::Context<T_CP>;

Event(MsgID msgID, Context context, VAddr vAddr, Tag tag, T_CP& comm) :
Event(MsgID msgID, Context context, VAddr vAddr, Tag tag, T_CP& comm) :
msgID(msgID),
context(context),
vAddr(vAddr),
tag(tag),
buf(nullptr),
size(0),
done(false),
comm(comm) {
comm(&comm) {

}
}

template<typename T_Buf>
Event(MsgID msgID, Context context, VAddr vAddr, Tag tag, T_Buf & buf, bool done, T_CP& comm) :
template<typename T_Buf>
Event(MsgID msgID, Context context, VAddr vAddr, Tag tag, T_Buf & buf, bool done, T_CP& comm) :
msgID(msgID),
context(context),
vAddr(vAddr),
tag(tag),
buf(reinterpret_cast<std::int8_t*>(buf.data())),
size(sizeof(typename T_Buf::value_type) * buf.size()),
done(done),
comm(comm) {
comm(&comm) {

}

}
Event& operator=(const Event&) = default;

void wait(){
void wait(){
while(!ready());

}
}

bool ready(){
//std::cout << "ready? size: " << size << std::endl;
Expand All @@ -89,19 +91,19 @@ namespace graybat {
else {
if(buf == nullptr){
// asyncSend Event
done = comm.ready(msgID, context, vAddr, tag);
done = comm->ready(msgID, context, vAddr, tag);
}
else {
// asyncRecv Event
done = comm.asyncRecvImpl(MsgType::PEER, context, vAddr, tag, buf, size);
done = comm->asyncRecvImpl(MsgType::PEER, context, vAddr, tag, buf, size);
}
}
return done;
}

VAddr source(){
return vAddr;
}
return vAddr;
}

Tag getTag(){
return tag;
Expand All @@ -113,17 +115,17 @@ namespace graybat {
VAddr vAddr;
Tag tag;
std::int8_t * buf;
const size_t size;
size_t size;
bool done;
T_CP& comm;
T_CP * comm;




};
};

} // zmq

} // namespace communicationPolicy

} // namespace graybat
83 changes: 82 additions & 1 deletion test/CageUT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,90 @@ BOOST_AUTO_TEST_CASE( asyncSend_recv ){

}

BOOST_AUTO_TEST_CASE( asyncSend_asyncRecv ){
hana::for_each(cages, [](auto cageRef){
// Test setup
using Cage = typename decltype(cageRef)::type;
using GP = typename Cage::GraphPolicy;
using Event = typename Cage::Event;
using Vertex = typename Cage::Vertex;
using Edge = typename Cage::Edge;

// Test run
{

auto& cage = cageRef.get();

cage.setGraph(graybat::pattern::FullyConnected<GP>(cage.getPeers().size()));
cage.distribute(graybat::mapping::Consecutive());

const unsigned nElements = 1000;

for(unsigned run_i = 0; run_i < nRuns; ++run_i){
std::vector<Event> sendEvents;
std::vector<Event> recvEvents;
std::vector<unsigned> send(nElements,0);
std::vector<std::vector<unsigned>> recv(0);

for(unsigned i = 0; i < send.size();++i){
send.at(i) = i;
}

// Send state to neighbor cells
for(Vertex &v : cage.hostedVertices){
for(Edge edge : cage.getOutEdges(v)){
cage.send(edge, send, sendEvents);
}
}

// Recv state from neighbor cells
for(Vertex &v : cage.hostedVertices){
for(Edge edge : cage.getInEdges(v)){
recv.push_back(std::vector<unsigned>(nElements,0));
cage.recv(edge, recv.back(), recvEvents);
}
}

// Wait to finish send events
for(unsigned i = 0; i < sendEvents.size(); ++i){
sendEvents.back().wait();
sendEvents.pop_back();
}

// Wait to finish recv events
while(true){
if(recvEvents.empty()) break;
for(auto it = recvEvents.begin(); it != recvEvents.end();) {
if ((*it).ready()) {
it = recvEvents.erase(it);
}
else {
it++;
}
}
}

// for(unsigned i = 0; i < recvEvents.size(); ++i){
// recvEvents.back().wait();
// recvEvents.pop_back();
// }

// Check values of async received data
for(auto v : recv){
for(unsigned i = 0; i < v.size();++i){
BOOST_CHECK_EQUAL(v.at(i), i);
}
}

// progress.print(nRuns, run_i);

}

}

});


}

BOOST_AUTO_TEST_SUITE_END()

Expand Down