Skip to content

Commit 74696d9

Browse files
committed
Merge pull request ComputationalRadiationPhysics#102 from erikzenker/topic-hotplug
ContextID generalized
2 parents 881dd0b + 58ff3f5 commit 74696d9

File tree

12 files changed

+207
-141
lines changed

12 files changed

+207
-141
lines changed

include/graybat/communicationPolicy/BMPI.hpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ namespace graybat {
5353
using type = graybat::communicationPolicy::bmpi::Context<BMPI>;
5454
};
5555

56-
template<>
56+
template<>
57+
struct ContextIDType<BMPI> {
58+
using type = unsigned;
59+
};
60+
61+
template<>
5762
struct EventType<BMPI> {
5863
using type = graybat::communicationPolicy::bmpi::Event;
5964
};

include/graybat/communicationPolicy/Traits.hpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ namespace graybat {
99
template <typename T_CommunicationPolicy>
1010
struct ContextType;
1111

12+
template <typename T_CommunicationPolicy>
13+
struct ContextIDType;
14+
1215
template <typename T_CommunicationPolicy>
1316
struct EventType;
1417

@@ -23,9 +26,7 @@ namespace graybat {
2326
template <typename T_CommunicationPolicy>
2427
using Tag = unsigned;
2528

26-
template <typename T_CommunicationPolicy>
27-
using ContextID = unsigned;
28-
29+
2930
enum class MsgTypeType : std::int8_t { VADDR_REQUEST = 0,
3031
VADDR_LOOKUP = 1,
3132
DESTRUCT = 2,
@@ -46,6 +47,10 @@ namespace graybat {
4647
template <typename T_CommunicationPolicy>
4748
using Context = typename traits::ContextType<T_CommunicationPolicy>::type;
4849

50+
template <typename T_CommunicationPolicy>
51+
using ContextID = typename traits::ContextIDType<T_CommunicationPolicy>::type;
52+
53+
4954
template <typename T_CommunicationPolicy>
5055
using Event = typename traits::EventType<T_CommunicationPolicy>::type;
5156

include/graybat/communicationPolicy/ZMQ.hpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ namespace graybat {
4545
using type = graybat::communicationPolicy::zmq::Context<ZMQ>;
4646
};
4747

48+
template<>
49+
struct ContextIDType<ZMQ> {
50+
using type = unsigned;
51+
};
52+
4853
template<>
4954
struct EventType<ZMQ> {
5055
using type = graybat::communicationPolicy::zmq::Event<ZMQ>;
@@ -143,12 +148,13 @@ namespace graybat {
143148
*
144149
***************************************************************************/
145150

146-
void createSocketsToPeers(){
147-
for(auto const &vAddr : initialContext){
148-
sendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH));
149-
ctrlSendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH));
150-
}
151+
void createSocketsToPeers(){
152+
for(auto const &vAddr : initialContext){
153+
(void)vAddr;
154+
sendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH));
155+
ctrlSendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH));
151156
}
157+
}
152158

153159
template <typename T_Socket>
154160
void connectToSocket(T_Socket& socket, std::string const signalingUri) {

include/graybat/communicationPolicy/socket/Base.hpp

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,12 @@ namespace graybat {
4444
using Message = graybat::communicationPolicy::socket::Message<CommunicationPolicy>;
4545
using Uri = graybat::communicationPolicy::socket::Uri<CommunicationPolicy>;
4646
using Socket = graybat::communicationPolicy::socket::Socket<CommunicationPolicy>;
47+
using ContextName = graybat::communicationPolicy::socket::ContextName<CommunicationPolicy>;
4748

4849
// Members
4950
const Uri masterUri;
5051
const size_t contextSize;
52+
const ContextName contextName;
5153
unsigned maxMsgID;
5254
std::mutex sendMtx;
5355
std::map<ContextID, std::map<VAddr, std::size_t> > sendSocketMappings;
@@ -62,6 +64,7 @@ namespace graybat {
6264
std::map<ContextID, std::map<VAddr, Uri> > ctrlPhoneBook;
6365
std::map<ContextID, std::map<Uri, VAddr> > inversePhoneBook;
6466
std::map<ContextID, std::map<Uri, VAddr> > inverseCtrlPhoneBook;
67+
std::map<ContextID, ContextName> contextNames;
6568

6669
std::thread recvHandler;
6770
std::thread ctrlHandler;
@@ -161,7 +164,7 @@ namespace graybat {
161164
ContextID getInitialContextID(T_Socket& socket, size_t const contextSize);
162165

163166
template <typename T_Socket>
164-
ContextID getContextID(T_Socket& socket, size_t const size);
167+
ContextID getContextID(T_Socket& socket, ContextName const contextName);
165168

166169
template <typename T_Socket>
167170
VAddr getVAddr(T_Socket &socket, ContextID const contextID, Uri const uri, Uri const ctrlUri);
@@ -199,6 +202,7 @@ namespace graybat {
199202
Base<T_CommunicationPolicy>::Base(Config const config) :
200203
masterUri(config.masterUri),
201204
contextSize(config.contextSize),
205+
contextName(config.contextName),
202206
maxMsgID(0),
203207
inBox(config.maxBufferSize),
204208
ctrlBox(config.maxBufferSize){
@@ -219,7 +223,8 @@ namespace graybat {
219223
static_cast<CommunicationPolicy*>(this)->connectToSocket(static_cast<CommunicationPolicy*>(this)->signalingSocket, masterUri);
220224

221225
// Retrieve Context id for initial context from signaling process
222-
ContextID contextID = getInitialContextID(static_cast<CommunicationPolicy*>(this)->signalingSocket, contextSize);
226+
ContextID contextID = getContextID(static_cast<CommunicationPolicy*>(this)->signalingSocket, contextName);
227+
contextNames[contextID] = contextName;
223228

224229
// Retrieve own vAddr from signaling process for initial context
225230
VAddr vAddr = getVAddr(static_cast<CommunicationPolicy*>(this)->signalingSocket, contextID, static_cast<CommunicationPolicy*>(this)->peerUri, static_cast<CommunicationPolicy*>(this)->ctrlUri);
@@ -260,7 +265,7 @@ namespace graybat {
260265
auto Base<T_CommunicationPolicy>::deinit()
261266
-> void {
262267
std::stringstream ss;
263-
ss << static_cast<size_t>(MsgType::DESTRUCT);
268+
ss << static_cast<size_t>(MsgType::DESTRUCT) << " " << contextName;
264269
static_cast<CommunicationPolicy*>(this)->sendToSocket(static_cast<CommunicationPolicy*>(this)->signalingSocket, ss);
265270

266271
std::array<unsigned, 1> null;
@@ -404,26 +409,31 @@ namespace graybat {
404409

405410
// Peer with VAddr 0 collects new members
406411
if( oldContext.getVAddr() == masterVAddr){
407-
std::array<unsigned, 2> nMembers {{ 0 }};
408-
std::vector<VAddr> vAddrs;
412+
std::array<ContextID, 1> newContextID {{ 0 }};
413+
std::array<unsigned, 1> newContextSize {{ 0 }};
414+
std::vector<VAddr> newContextWhiteList(0, 0);
409415

410416
for(auto const &vAddr : oldContext){
411417
std::array<unsigned, 1> remoteIsMember {{ 0 }};
412418
//std::cout << "Recv remoteIsMember: " << vAddr << std::endl;
413419
static_cast<CommunicationPolicy*>(this)->recvImpl(MsgType::SPLIT, oldContext, vAddr, 0, remoteIsMember);
414420

415421
if(remoteIsMember[0]) {
416-
nMembers[0]++;
417-
vAddrs.push_back(vAddr);
422+
newContextWhiteList.push_back(vAddr);
418423

419424
}
420425

421426
}
422427

423-
nMembers[1] = getContextID(static_cast<CommunicationPolicy*>(this)->signalingSocket, nMembers[0]);
428+
ContextName newContextName = contextName + "_" + std::to_string(std::rand());
429+
newContextID[0] = getContextID(static_cast<CommunicationPolicy*>(this)->signalingSocket, newContextName);
430+
contextNames[newContextID[0]] = newContextName;
431+
newContextSize[0] = newContextWhiteList.size();
424432

425-
for(VAddr vAddr : vAddrs){
426-
static_cast<CommunicationPolicy*>(this)->asyncSendImpl(MsgType::SPLIT, getMsgID(), oldContext, vAddr, 0, nMembers);
433+
for(VAddr vAddr : newContextWhiteList){
434+
static_cast<CommunicationPolicy*>(this)->asyncSendImpl(MsgType::SPLIT, getMsgID(), oldContext, vAddr, 0, newContextID);
435+
static_cast<CommunicationPolicy*>(this)->asyncSendImpl(MsgType::SPLIT, getMsgID(), oldContext, vAddr, 0, newContextSize);
436+
static_cast<CommunicationPolicy*>(this)->asyncSendImpl(MsgType::SPLIT, getMsgID(), oldContext, vAddr, 0, newContextWhiteList);
427437

428438
}
429439

@@ -432,29 +442,35 @@ namespace graybat {
432442
//std::cout << oldContext.getVAddr() << " check 0" << std::endl;
433443

434444
if(isMember){
435-
std::array<unsigned, 2> nMembers {{ 0 , 0 }};
445+
std::array<ContextID, 1> newContextID {{ 0 }};
446+
std::array<unsigned, 1> newContextSize {{ 0 }};
436447

437-
static_cast<CommunicationPolicy*>(this)->recvImpl(MsgType::SPLIT, oldContext, 0, 0, nMembers);
438-
ContextID newContextID = nMembers[1];
448+
static_cast<CommunicationPolicy*>(this)->recvImpl(MsgType::SPLIT, oldContext, 0, 0, newContextID);
449+
static_cast<CommunicationPolicy*>(this)->recvImpl(MsgType::SPLIT, oldContext, 0, 0, newContextSize);
439450

440-
newContext = Context(newContextID, getVAddr(static_cast<CommunicationPolicy*>(this)->signalingSocket, newContextID, static_cast<CommunicationPolicy*>(this)->peerUri, static_cast<CommunicationPolicy*>(this)->ctrlUri), nMembers[0]);
451+
std::vector<VAddr> newContextWhiteList(newContextSize[0], 0);
452+
453+
static_cast<CommunicationPolicy*>(this)->recvImpl(MsgType::SPLIT, oldContext, 0, 0, newContextWhiteList);
454+
455+
newContext = Context(newContextID[0], oldContext.getVAddr(), newContextWhiteList);
441456
contexts[newContext.getID()] = newContext;
442457

443458
//std::cout << oldContext.getVAddr() << " check 1" << std::endl;
444459
// Update phonebook for new context
445460
for(auto const &vAddr : newContext){
446-
Uri remoteUri;
447-
Uri ctrlUri;
448-
std::tie(remoteUri, ctrlUri) = getUri(static_cast<CommunicationPolicy*>(this)->signalingSocket, newContext.getID(), vAddr);
461+
Uri remoteUri = phoneBook[oldContext.getID()][vAddr];
462+
Uri ctrlUri = ctrlPhoneBook[oldContext.getID()][vAddr];
449463
phoneBook[newContext.getID()][vAddr] = remoteUri;
450-
inversePhoneBook[newContext.getID()][remoteUri] = vAddr;
464+
ctrlPhoneBook[newContext.getID()][vAddr] = ctrlUri;
465+
inversePhoneBook[newContext.getID()][remoteUri] = inversePhoneBook[oldContext.getID()][remoteUri];
466+
inverseCtrlPhoneBook[newContext.getID()][ctrlUri] = inverseCtrlPhoneBook[oldContext.getID()][ctrlUri];
451467

452468
}
453469

454470
//std::cout << oldContext.getVAddr() << " check 2" << std::endl;
455471
// Create mappings to sockets for new context
456472
for(auto const &vAddr : newContext){
457-
Uri uri = phoneBook.at(newContext.getID()).at(vAddr);
473+
Uri uri = phoneBook.at(oldContext.getID()).at(vAddr);
458474
VAddr oldVAddr = inversePhoneBook.at(oldContext.getID()).at(uri);
459475
sendSocketMappings[newContext.getID()][vAddr] = sendSocketMappings.at(oldContext.getID()).at(oldVAddr);
460476

@@ -505,15 +521,15 @@ namespace graybat {
505521

506522
template <typename T_CommunicationPolicy>
507523
template <typename T_Socket>
508-
auto Base<T_CommunicationPolicy>::getContextID(T_Socket& socket, size_t const size)
524+
auto Base<T_CommunicationPolicy>::getContextID(T_Socket& socket, ContextName const contextName)
509525
-> graybat::communicationPolicy::ContextID<T_CommunicationPolicy> {
510526
using ContextID = graybat::communicationPolicy::ContextID<T_CommunicationPolicy>;
511527

512528
ContextID contextID = 0;
513529

514530
// Send vAddr request
515531
std::stringstream ss;
516-
ss << static_cast<size_t>(MsgType::CONTEXT_REQUEST) << " " << size;;
532+
ss << static_cast<size_t>(MsgType::CONTEXT_REQUEST) << " " << contextName << " ";
517533

518534
static_cast<CommunicationPolicy*>(this)->sendToSocket(socket, ss);
519535

include/graybat/communicationPolicy/socket/Traits.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ namespace graybat {
2828

2929
template <typename T_CommunicationPolicy>
3030
using Message = typename traits::MessageType<T_CommunicationPolicy>::type;
31+
32+
template <typename T_CommunicationPolicy>
33+
using ContextName = std::string;
3134

3235
} // namespace socket
3336

include/graybat/communicationPolicy/zmq/Config.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace graybat {
1111
std::string masterUri;
1212
std::string peerUri;
1313
size_t contextSize;
14+
std::string contextName = "context";
1415
size_t maxBufferSize = 100 * 1000 * 1000;
1516
};
1617

include/graybat/communicationPolicy/zmq/Context.hpp

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
#pragma once
22

3+
// STL
4+
#include <numeric>
5+
6+
// GRAYBAT
37
#include <graybat/communicationPolicy/Traits.hpp>
48
#include <graybat/communicationPolicy/zmq/VAddrIterator.hpp>
59

@@ -28,16 +32,29 @@ namespace graybat {
2832
contextID(0),
2933
vAddr(0),
3034
nPeers(1),
31-
isValid(false){
35+
isValid(false),
36+
peers(0){
3237

3338
}
3439

3540
Context(ContextID contextID, VAddr vAddr, unsigned nPeers) :
3641
contextID(contextID),
3742
vAddr(vAddr),
3843
nPeers(nPeers),
39-
isValid(true){
40-
44+
isValid(true),
45+
peers(0){
46+
47+
48+
peers.resize(nPeers);
49+
std::iota(peers.begin(), peers.end(), 0);
50+
}
51+
52+
Context(ContextID contextID, VAddr vAddr, std::vector<VAddr> peers) :
53+
contextID(contextID),
54+
vAddr(vAddr),
55+
nPeers(peers.size()),
56+
isValid(true),
57+
peers(peers) {
4158
}
4259

4360
size_t size() const{
@@ -56,27 +73,28 @@ namespace graybat {
5673
return isValid;
5774
}
5875

59-
VAddrIterator<T_CP> begin(){
60-
return VAddrIterator<T_CP>(0);
76+
std::vector<VAddr>::iterator begin(){
77+
return peers.begin();
6178
}
6279

63-
VAddrIterator<T_CP> begin() const {
64-
return VAddrIterator<T_CP>(0);
80+
std::vector<VAddr>::const_iterator begin() const {
81+
return peers.cbegin();
6582
}
66-
67-
VAddrIterator<T_CP> end(){
68-
return VAddrIterator<T_CP>(size());
83+
84+
std::vector<VAddr>::iterator end(){
85+
return peers.end();
6986
}
7087

71-
VAddrIterator<T_CP> end() const {
72-
return VAddrIterator<T_CP>(size());
88+
std::vector<VAddr>::const_iterator end() const {
89+
return peers.cend();
7390
}
7491

7592
private:
7693
ContextID contextID;
7794
VAddr vAddr;
7895
unsigned nPeers;
79-
bool isValid;
96+
bool isValid;
97+
std::vector<VAddr> peers;
8098
};
8199

82100

test/CageUT.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ using BMPIConfig = BMPI::Config;
8484

8585
ZMQConfig zmqConfig = {"tcp://127.0.0.1:5000",
8686
"tcp://127.0.0.1:5001",
87-
static_cast<size_t>(std::stoi(std::getenv("OMPI_COMM_WORLD_SIZE")))};
87+
static_cast<size_t>(std::stoi(std::getenv("OMPI_COMM_WORLD_SIZE"))),
88+
"context_cage_test"};
8889

8990
BMPIConfig bmpiConfig;
9091

test/CommunicationPolicyUT.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ using BMPIConfig = BMPI::Config;
2828

2929
ZMQConfig zmqConfig = {"tcp://127.0.0.1:5000",
3030
"tcp://127.0.0.1:5001",
31-
static_cast<size_t>(std::stoi(std::getenv("OMPI_COMM_WORLD_SIZE")))};
31+
static_cast<size_t>(std::stoi(std::getenv("OMPI_COMM_WORLD_SIZE"))),
32+
"context_cp_test"};
3233

3334
BMPIConfig bmpiConfig;
3435

test/EdgeUT.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ using BMPIConfig = BMPI::Config;
3535

3636
ZMQConfig zmqConfig = {"tcp://127.0.0.1:5000",
3737
"tcp://127.0.0.1:5001",
38-
static_cast<size_t>(std::stoi(std::getenv("OMPI_COMM_WORLD_SIZE")))};
38+
static_cast<size_t>(std::stoi(std::getenv("OMPI_COMM_WORLD_SIZE"))),
39+
"context_edge_test"};
3940

4041
BMPIConfig bmpiConfig;
4142

0 commit comments

Comments
 (0)