Skip to content

Commit c03aafa

Browse files
committed
test case for async recv of cage
1 parent 74696d9 commit c03aafa

File tree

3 files changed

+143
-59
lines changed

3 files changed

+143
-59
lines changed

include/graybat/communicationPolicy/bmpi/Event.hpp

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,46 +3,47 @@
33
#include <boost/mpi/environment.hpp>
44

55
namespace graybat {
6-
6+
77
namespace communicationPolicy {
88

99
namespace bmpi {
1010

1111

12-
/**
13-
* @brief An event is returned by non-blocking
14-
* communication operations and can be
15-
* asked whether an operation has finished
16-
* or it can be waited for this operation to
17-
* be finished.
18-
*
19-
*/
20-
class Event {
21-
typedef unsigned Tag;
12+
/**
13+
* @brief An event is returned by non-blocking
14+
* communication operations and can be
15+
* asked whether an operation has finished
16+
* or it can be waited for this operation to
17+
* be finished.
18+
*
19+
*/
20+
class Event {
21+
typedef unsigned Tag;
2222
typedef unsigned VAddr;
23-
24-
public:
25-
Event(boost::mpi::request request) : request(request), async(true){
2623

27-
}
24+
public:
25+
Event(boost::mpi::request request) : request(request), async(true){
26+
27+
}
2828

2929
Event(boost::mpi::status status) : status(status), async(false){
3030

3131
}
3232

33+
Event& operator=(const Event&) = default;
3334

34-
~Event(){
35+
~Event(){
3536

36-
}
37+
}
3738

38-
void wait(){
39+
void wait(){
3940
if(async){
4041
request.wait();
4142
}
42-
43-
}
4443

45-
bool ready(){
44+
}
45+
46+
bool ready(){
4647
if(async){
4748
boost::optional<boost::mpi::status> status = request.test();
4849

@@ -55,7 +56,7 @@ namespace graybat {
5556
}
5657
return true;
5758

58-
}
59+
}
5960

6061
VAddr source(){
6162
if(async){
@@ -72,18 +73,18 @@ namespace graybat {
7273

7374
}
7475

75-
private:
76-
boost::mpi::request request;
76+
private:
77+
boost::mpi::request request;
7778
boost::mpi::status status;
78-
const bool async;
79+
bool async;
80+
81+
7982

80-
81-
82-
};
83+
};
8384

8485
} // namespace bmpi
85-
86+
8687
} // namespace communicationPolicy
87-
88+
8889
} // namespace graybat
8990

include/graybat/communicationPolicy/zmq/Event.hpp

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,17 @@ namespace graybat {
1212

1313
namespace zmq {
1414

15-
/**
16-
* @brief An event is returned by non-blocking
17-
* communication operations and can be
18-
* asked whether an operation has finished
19-
* or it can be waited for this operation to
20-
* be finished.
21-
*
22-
*/
15+
/**
16+
* @brief An event is returned by non-blocking
17+
* communication operations and can be
18+
* asked whether an operation has finished
19+
* or it can be waited for this operation to
20+
* be finished.
21+
*
22+
*/
2323
template <typename T_CP>
24-
class Event {
25-
public:
24+
class Event {
25+
public:
2626

2727
using ContextID = typename graybat::communicationPolicy::ContextID<T_CP>;
2828
using VAddr = typename graybat::communicationPolicy::VAddr<T_CP>;
@@ -31,35 +31,37 @@ namespace graybat {
3131
using MsgID = typename graybat::communicationPolicy::MsgID<T_CP>;
3232
using Context = typename graybat::communicationPolicy::Context<T_CP>;
3333

34-
Event(MsgID msgID, Context context, VAddr vAddr, Tag tag, T_CP& comm) :
34+
Event(MsgID msgID, Context context, VAddr vAddr, Tag tag, T_CP& comm) :
3535
msgID(msgID),
3636
context(context),
3737
vAddr(vAddr),
3838
tag(tag),
3939
buf(nullptr),
4040
size(0),
4141
done(false),
42-
comm(comm) {
42+
comm(&comm) {
4343

44-
}
44+
}
4545

46-
template<typename T_Buf>
47-
Event(MsgID msgID, Context context, VAddr vAddr, Tag tag, T_Buf & buf, bool done, T_CP& comm) :
46+
template<typename T_Buf>
47+
Event(MsgID msgID, Context context, VAddr vAddr, Tag tag, T_Buf & buf, bool done, T_CP& comm) :
4848
msgID(msgID),
4949
context(context),
5050
vAddr(vAddr),
5151
tag(tag),
5252
buf(reinterpret_cast<std::int8_t*>(buf.data())),
5353
size(sizeof(typename T_Buf::value_type) * buf.size()),
5454
done(done),
55-
comm(comm) {
55+
comm(&comm) {
56+
57+
}
5658

57-
}
59+
Event& operator=(const Event&) = default;
5860

59-
void wait(){
61+
void wait(){
6062
while(!ready());
6163

62-
}
64+
}
6365

6466
bool ready(){
6567
//std::cout << "ready? size: " << size << std::endl;
@@ -69,19 +71,19 @@ namespace graybat {
6971
else {
7072
if(buf == nullptr){
7173
// asyncSend Event
72-
done = comm.ready(msgID, context, vAddr, tag);
74+
done = comm->ready(msgID, context, vAddr, tag);
7375
}
7476
else {
7577
// asyncRecv Event
76-
done = comm.asyncRecvImpl(MsgType::PEER, context, vAddr, tag, buf, size);
78+
done = comm->asyncRecvImpl(MsgType::PEER, context, vAddr, tag, buf, size);
7779
}
7880
}
7981
return done;
8082
}
8183

8284
VAddr source(){
83-
return vAddr;
84-
}
85+
return vAddr;
86+
}
8587

8688
Tag getTag(){
8789
return tag;
@@ -93,17 +95,17 @@ namespace graybat {
9395
VAddr vAddr;
9496
Tag tag;
9597
std::int8_t * buf;
96-
const size_t size;
98+
size_t size;
9799
bool done;
98-
T_CP& comm;
100+
T_CP * comm;
99101

100102

101103

102104

103-
};
105+
};
104106

105107
} // zmq
106-
108+
107109
} // namespace communicationPolicy
108-
110+
109111
} // namespace graybat

test/CageUT.cpp

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,9 +306,90 @@ BOOST_AUTO_TEST_CASE( asyncSend_recv ){
306306

307307
}
308308

309+
BOOST_AUTO_TEST_CASE( asyncSend_asyncRecv ){
310+
hana::for_each(cages, [](auto cageRef){
311+
// Test setup
312+
using Cage = typename decltype(cageRef)::type;
313+
using GP = typename Cage::GraphPolicy;
314+
using Event = typename Cage::Event;
315+
using Vertex = typename Cage::Vertex;
316+
using Edge = typename Cage::Edge;
317+
318+
// Test run
319+
{
320+
321+
auto& cage = cageRef.get();
322+
323+
cage.setGraph(graybat::pattern::FullyConnected<GP>(cage.getPeers().size()));
324+
cage.distribute(graybat::mapping::Consecutive());
325+
326+
const unsigned nElements = 1000;
327+
328+
for(unsigned run_i = 0; run_i < nRuns; ++run_i){
329+
std::vector<Event> sendEvents;
330+
std::vector<Event> recvEvents;
331+
std::vector<unsigned> send(nElements,0);
332+
std::vector<std::vector<unsigned>> recv(0);
333+
334+
for(unsigned i = 0; i < send.size();++i){
335+
send.at(i) = i;
336+
}
337+
338+
// Send state to neighbor cells
339+
for(Vertex &v : cage.hostedVertices){
340+
for(Edge edge : cage.getOutEdges(v)){
341+
cage.send(edge, send, sendEvents);
342+
}
343+
}
344+
345+
// Recv state from neighbor cells
346+
for(Vertex &v : cage.hostedVertices){
347+
for(Edge edge : cage.getInEdges(v)){
348+
recv.push_back(std::vector<unsigned>(nElements,0));
349+
cage.recv(edge, recv.back(), recvEvents);
350+
}
351+
}
352+
353+
// Wait to finish send events
354+
for(unsigned i = 0; i < sendEvents.size(); ++i){
355+
sendEvents.back().wait();
356+
sendEvents.pop_back();
357+
}
358+
359+
// Wait to finish recv events
360+
while(true){
361+
if(recvEvents.empty()) break;
362+
for(auto it = recvEvents.begin(); it != recvEvents.end();) {
363+
if ((*it).ready()) {
364+
it = recvEvents.erase(it);
365+
}
366+
else {
367+
it++;
368+
}
369+
}
370+
}
371+
372+
// for(unsigned i = 0; i < recvEvents.size(); ++i){
373+
// recvEvents.back().wait();
374+
// recvEvents.pop_back();
375+
// }
376+
377+
// Check values of async received data
378+
for(auto v : recv){
379+
for(unsigned i = 0; i < v.size();++i){
380+
BOOST_CHECK_EQUAL(v.at(i), i);
381+
}
382+
}
383+
384+
// progress.print(nRuns, run_i);
385+
386+
}
387+
388+
}
309389

390+
});
310391

311-
392+
}
312393

313394
BOOST_AUTO_TEST_SUITE_END()
314395

0 commit comments

Comments
 (0)