Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ add_executable(check EXCLUDE_FROM_ALL ${TESTS})
target_link_libraries(check ${LIBS})

# Fast testing
file(GLOB TESTSFAST test/UT.cpp test/CommunicationPolicyUT.cpp)
file(GLOB TESTSFAST test/UT.cpp test/UpdateUT.cpp)
add_executable(checkFast EXCLUDE_FROM_ALL ${TESTSFAST})
target_link_libraries(checkFast ${LIBS})

Expand Down Expand Up @@ -134,4 +134,4 @@ foreach(P ${_SOURCES_PRE})
set(_SOURCES ${_SOURCES} ${P})
endif()
endforeach()
ADD_LIBRARY(CLION_DUMMY_TARGET EXCLUDE_FROM_ALL ${_SOURCES})
ADD_LIBRARY(CLION_DUMMY_TARGET EXCLUDE_FROM_ALL ${_SOURCES})
11 changes: 10 additions & 1 deletion include/graybat/communicationPolicy/Base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,15 @@ namespace graybat {
* @brief Returns a subcontext of *oldContext* with all peers that want to participate.
* Peers which do not want to participate retrieve an invalid context.
*/
Context splitContext(const bool isMember, const Context oldContext) = delete;
Context splitContext(const bool isMember, const Context oldContext) = delete;

/**
* @brief Returns the context that contains all peers
*/
Context getGlobalContext() = delete;

Context updateContext(const Context oldContext);

/** @} */


Expand Down Expand Up @@ -531,6 +533,13 @@ namespace graybat {

}

template <typename T_CommunicationPolicy>
auto Base<T_CommunicationPolicy>::updateContext(const Context oldContext)
-> Context {
return oldContext;
}


} // namespace communicationPolicy

} // namespace graybat
Expand Down
22 changes: 12 additions & 10 deletions include/graybat/communicationPolicy/Traits.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@ namespace graybat {
using Tag = unsigned;


enum class MsgTypeType : std::int8_t { VADDR_REQUEST = 0,
VADDR_LOOKUP = 1,
DESTRUCT = 2,
RETRY = 3,
ACK = 4,
CONTEXT_INIT = 5,
CONTEXT_REQUEST = 6,
PEER = 7,
CONFIRM = 8,
SPLIT = 9};
enum class MsgTypeType : std::int8_t {
VADDR_REQUEST = 0,
VADDR_LOOKUP = 1,
DESTRUCT = 2,
RETRY = 3,
ACK = 4,
CONTEXT_INIT = 5,
CONTEXT_REQUEST = 6,
PEER = 7,
CONFIRM = 8,
SPLIT = 9,
CONTEXT_STATE = 10};

template <typename T_CommunicationPolicy>
using MsgType = MsgTypeType;
Expand Down
27 changes: 16 additions & 11 deletions include/graybat/communicationPolicy/ZMQ.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,24 @@ namespace graybat {
*
***************************************************************************/

void createSocketsToPeers(){
for(auto const &vAddr : initialContext){
(void)vAddr;
sendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH));
ctrlSendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH));
void createSocketsToPeers(Context context){
for(auto const &vAddr : context){
createSocketToPeer(vAddr);
}
}

template <typename T_Socket>
void connectToSocket(T_Socket& socket, std::string const signalingUri) {
socket.connect(signalingUri.c_str());

}

std::size_t createSocketToPeer(VAddr vAddr) {
(void)vAddr;
sendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH));
ctrlSendSockets.emplace_back(Socket(zmqContext, ZMQ_PUSH));
return sendSockets.size() - 1;
}

template <typename T_Socket>
void connectToSocket(T_Socket& socket, std::string const signalingUri) {
socket.connect(signalingUri.c_str());

}

template <typename T_Socket>
void recvFromSocket(T_Socket& socket, std::stringstream& ss) {
Expand Down
89 changes: 83 additions & 6 deletions include/graybat/communicationPolicy/socket/Base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ namespace graybat {
template <typename T_Socket>
void recvFromSocket (T_Socket& socket, Message & message) = delete;

void createSocketsToPeers() = delete;
void createSocketsToPeers(Context context) = delete;

std::size_t createSocketToPeer(VAddr vAddr) = delete;

// P2P INTERFACE

Expand Down Expand Up @@ -157,7 +159,7 @@ namespace graybat {
// CONTEXT INTERFACE
Context getGlobalContext();
Context splitContext(const bool isMember, const Context oldContext);

Context updateContext(const Context oldContext);

// SIGNALING METHODS
template <typename T_Socket>
Expand Down Expand Up @@ -244,7 +246,7 @@ namespace graybat {

// Create socket connection to other peers
// Create socketmapping from initial context to sockets of VAddrs
static_cast<CommunicationPolicy*>(this)->createSocketsToPeers();
static_cast<CommunicationPolicy*>(this)->createSocketsToPeers(initialContext);

for(auto const &vAddr : initialContext){
sendSocketMappings[initialContext.getID()][vAddr] = vAddr;
Expand Down Expand Up @@ -482,15 +484,17 @@ namespace graybat {
newContext = Context();
}

//static_cast<CommunicationPolicy*>(this)->synchronize(oldContext);

//std::cout << oldContext.getVAddr() << " check 3" << std::endl;

// Barrier thus recvHandler is up to date with sendSocketMappings
// Necessary in environment with multiple zmq objects
std::array<unsigned, 0> null;
for(auto const &vAddr : oldContext){
for(auto const &vAddr : oldContext){
static_cast<CommunicationPolicy*>(this)->asyncSendImpl(MsgType::SPLIT, getMsgID(), oldContext, vAddr, 0, null);
}
for(auto const &vAddr : oldContext){
for(auto const &vAddr : oldContext){
static_cast<CommunicationPolicy*>(this)->recvImpl(MsgType::SPLIT, oldContext, vAddr, 0, null);
}

Expand All @@ -499,7 +503,6 @@ namespace graybat {

}


template <typename T_CommunicationPolicy>
template <typename T_Socket>
auto Base<T_CommunicationPolicy>::getInitialContextID(T_Socket& socket, size_t const contextSize)
Expand Down Expand Up @@ -806,6 +809,80 @@ namespace graybat {

}

template <typename T_CommunicationPolicy>
auto Base<T_CommunicationPolicy>::updateContext(Context oldContext)
-> graybat::communicationPolicy::Context<T_CommunicationPolicy> {
using VAddr = graybat::communicationPolicy::VAddr<T_CommunicationPolicy>;
using Context = graybat::communicationPolicy::Context<T_CommunicationPolicy>;

size_t newContextSize(0);

// Send context state request
std::stringstream ss;
ss << static_cast<size_t>(MsgType::CONTEXT_STATE) << " " << oldContext.getID() << " ";
static_cast<CommunicationPolicy*>(this)->sendToSocket(static_cast<CommunicationPolicy*>(this)->signalingSocket, ss);

// Receive new context size
std::stringstream sss;
static_cast<CommunicationPolicy*>(this)->recvFromSocket(static_cast<CommunicationPolicy*>(this)->signalingSocket, sss);
sss >> newContextSize;

// Create context with new context size
Context updatedContext = Context(oldContext.getID(), oldContext.getVAddr(), newContextSize);

// Get Uri of each peer in the new context
// update the phonebooks for this peer
for(auto const &vAddr : updatedContext){
Uri remoteUri;
Uri ctrlUri;
std::tie(remoteUri, ctrlUri) = getUri(static_cast<CommunicationPolicy*>(this)->signalingSocket, updatedContext.getID(), vAddr);

// Create sockets to new peers here ?
if(phoneBook[updatedContext.getID()].find(vAddr) == phoneBook[updatedContext.getID()].end()){
phoneBook[updatedContext.getID()][vAddr] = remoteUri;
ctrlPhoneBook[updatedContext.getID()][vAddr] = ctrlUri;
inversePhoneBook[updatedContext.getID()][remoteUri] = vAddr;
inverseCtrlPhoneBook[updatedContext.getID()][ctrlUri] = vAddr;

std::size_t socket_t = static_cast<CommunicationPolicy*>(this)->createSocketToPeer(vAddr);
sendSocketMappings[updatedContext.getID()][vAddr] = socket_t;
static_cast<CommunicationPolicy*>(this)->connectToSocket(static_cast<CommunicationPolicy*>(this)->sendSockets.at(sendSocketMappings.at(updatedContext.getID()).at(vAddr)), phoneBook.at(updatedContext.getID()).at(vAddr).c_str());
static_cast<CommunicationPolicy*>(this)->connectToSocket(static_cast<CommunicationPolicy*>(this)->ctrlSendSockets.at(sendSocketMappings.at(updatedContext.getID()).at(vAddr)), ctrlPhoneBook.at(updatedContext.getID()).at(vAddr).c_str());

//std::cout << vAddr << " was not included in phoneBook" << std::endl;
}
else {

phoneBook[updatedContext.getID()][vAddr] = remoteUri;
ctrlPhoneBook[updatedContext.getID()][vAddr] = ctrlUri;
inversePhoneBook[updatedContext.getID()][remoteUri] = vAddr;
inverseCtrlPhoneBook[updatedContext.getID()][ctrlUri] = vAddr;

}

}

// TODO: now create the new sockets


// Create socket connection to other peers
// Create socketmapping from initial context to sockets of VAddrs
// static_cast<CommunicationPolicy*>(this)->createSocketsToPeers(updatedContext);
//
// for(auto const &vAddr : initialContext){
// sendSocketMappings[initialContext.getID()][vAddr] = vAddr;
// static_cast<CommunicationPolicy*>(this)->connectToSocket(static_cast<CommunicationPolicy*>(this)->sendSockets.at(sendSocketMappings.at(initialContext.getID()).at(vAddr)), phoneBook.at(initialContext.getID()).at(vAddr).c_str());
// static_cast<CommunicationPolicy*>(this)->connectToSocket(static_cast<CommunicationPolicy*>(this)->ctrlSendSockets.at(sendSocketMappings.at(initialContext.getID()).at(vAddr)), ctrlPhoneBook.at(initialContext.getID()).at(vAddr).c_str());
//
// //std::cout << "sendSocket_i: " << vAddr << " --> " << phoneBook.at(initialContext.getID()).at(vAddr) << std::endl;
//
// }


return updatedContext;
}


} // socket

} // communicationPolicy
Expand Down
65 changes: 65 additions & 0 deletions test/CageUT.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,71 @@ BOOST_AUTO_TEST_CASE( asyncSend_recv ){

}

BOOST_AUTO_TEST_CASE( asyncSend_asyncRecv ){
hana::for_each(cages, [](auto cageRef){
// Test setup
using Cage = typename decltype(cageRef)::type;
using GP = typename Cage::GraphPolicy;
using Event = typename Cage::Event;
using Vertex = typename Cage::Vertex;
using Edge = typename Cage::Edge;

// Test run
{

auto& cage = cageRef.get();

cage.setGraph(graybat::pattern::FullyConnected<GP>(cage.getPeers().size()));
cage.distribute(graybat::mapping::Consecutive());

const unsigned nElements = 1000;

for(unsigned run_i = 0; run_i < nRuns; ++run_i){
std::vector<Event> sendEvents;
std::vector<Event> recvEvents;
std::vector<unsigned> send(nElements,0);
std::vector<std::vector<unsigned>> recv(0);

for(unsigned i = 0; i < send.size();++i){
send.at(i) = i;
}

// Send state to neighbor cells
for(Vertex &v : cage.hostedVertices){
for(Edge edge : cage.getOutEdges(v)){
cage.send(edge, send, sendEvents);
}
}

// Recv state from neighbor cells
for(Vertex &v : cage.hostedVertices){
for(Edge edge : cage.getInEdges(v)){
recv.push_back(std::vector<unsigned>(nElements,0));
cage.recv(edge, recv, recvEvents);
}
}

// Wait to finish send events
for(unsigned i = 0; i < sendEvents.size(); ++i){
sendEvents.back().wait();
sendEvents.pop_back();
}

// Wait to finish recv events
for(unsigned i = 0; i < recvEvents.size(); ++i){
recvEvents.back().wait();
recvEvents.pop_back();
}

// progress.print(nRuns, run_i);

}

}

});

}



Expand Down
Loading