Skip to content
This repository has been archived by the owner on Feb 23, 2024. It is now read-only.

Commit

Permalink
Added broadcast primitives for connectionless
Browse files Browse the repository at this point in the history
  • Loading branch information
depaulmillz committed Mar 10, 2021
1 parent 1e5050a commit 9752784
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 14 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.0)

project(NetworkLayer VERSION 0.2.0)
project(NetworkLayer VERSION 0.2.1)

find_package(GTest CONFIG REQUIRED)

Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,8 @@ static_assert(NETWORK_VER_MAJOR == 0
&& NETWORK_VER_PATCH == 1,
"Need to ensure using the correct version");
```
## Notes
Broadcast primitives on connectionless clients and servers should be used alone
(i.e. without send and receive ever used on client and server)
122 changes: 109 additions & 13 deletions fabricBased/include/networklayer/connectionless.hh
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ namespace cse498 {
/*
* Memory region handler type
*/
using mr_t = fid_mr*;
using mr_t = fid_mr *;

using addr_t = fi_addr_t;

/**
* Free an memory region handler
* @param x memory region handler
*/
void free_mr(mr_t x){
void free_mr(mr_t x) {
ERRCHK(fi_close(&x->fid));
}

Expand Down Expand Up @@ -183,7 +183,7 @@ namespace cse498 {
* @param size size of buffer
* @param mr memory region object, not preallocated
*/
void registerMR(char* buf, size_t size, mr_t& mr){
void registerMR(char *buf, size_t size, mr_t &mr) {
ERRCHK(fi_mr_reg(domain, buf, size,
FI_WRITE | FI_REMOTE_WRITE | FI_READ | FI_REMOTE_READ, 0,
0, 0, &mr, NULL));
Expand All @@ -210,7 +210,7 @@ namespace cse498 {
}

fi_info *fi, *hints;
fid_domain* domain;
fid_domain *domain;
fid_fabric *fabric;
fi_cq_attr cq_attr;
fi_av_attr av_attr;
Expand All @@ -231,7 +231,7 @@ namespace cse498 {
* @param address connect to this address
* @param port connect to this port
*/
ConnectionlessClient(const char* address, uint16_t port) {
ConnectionlessClient(const char *address, uint16_t port) {
SPDLOG_TRACE("Getting fi provider");
hints = fi_allocinfo();
hints->caps = FI_MSG;
Expand Down Expand Up @@ -348,7 +348,7 @@ namespace cse498 {
* @param size size of buffer
* @param mr memory region
*/
void registerMR(char* buf, size_t size, mr_t& mr){
void registerMR(char *buf, size_t size, mr_t &mr) {
ERRCHK(fi_mr_reg(domain, buf, size,
FI_WRITE | FI_REMOTE_WRITE | FI_READ | FI_REMOTE_READ, 0,
0, 0, &mr, NULL));
Expand Down Expand Up @@ -376,7 +376,7 @@ namespace cse498 {

fi_addr_t remote_addr;
fi_info *fi, *hints;
fid_domain* domain;
fid_domain *domain;
fid_fabric *fabric;
fi_cq_attr cq_attr;
fi_av_attr av_attr;
Expand All @@ -397,16 +397,16 @@ namespace cse498 {
* @param addr address
* @param port port
*/
Connectionless_t(bool useServer, char* addr, int port) : isServer(useServer) {
if(isServer){
Connectionless_t(bool useServer, char *addr, int port) : isServer(useServer) {
if (isServer) {
this->server = new ConnectionlessServer(addr, port);
} else {
this->client = new ConnectionlessClient(addr, port);
}
}

~Connectionless_t(){
if(isServer){
~Connectionless_t() {
if (isServer) {
delete server;
} else {
delete client;
Expand All @@ -415,11 +415,107 @@ namespace cse498 {

bool isServer;
union {
ConnectionlessServer* server;
ConnectionlessClient* client;
ConnectionlessServer *server;
ConnectionlessClient *client;
};
};


/**
* Performs best effort broadcast
* @param c server
* @param addresses addresses to send to
* @param message message to send
* @param messageSize size of message
*/
void bestEffortBroadcast(ConnectionlessServer &c, const std::vector<addr_t> &addresses, char *message,
size_t messageSize) {
for (auto &a : addresses) {
c.send(a, message, messageSize);
}
}

/**
* Performs best effort broadcast
* @param clients clients to send to
* @param message message to send
* @param messageSize size of message
*/
void bestEffortBroadcast(std::vector<ConnectionlessClient> &clients, char *message, size_t messageSize) {
for (auto &c : clients) {
c.send(message, messageSize);
}
}

/**
* Performs best effort broadcast recieve from client
* @param clients clients to recv from
* @param buf buffer
* @param sizeOfBuf buffer size
*/
void bestEffortBroadcastReceiveFrom(ConnectionlessClient &client, char *buf, size_t sizeOfBuf) {
client.recv(buf, sizeOfBuf);
}

/**
* Performs best effort broadcast recieve from client
* @param c connection to recv from
* @param address address to recv from
* @param buf buffer
* @param sizeOfBuf buffer size
*/
void bestEffortBroadcastReceiveFrom(ConnectionlessServer &c, addr_t address, char *buf, size_t sizeOfBuf) {
c.recv(address, buf, sizeOfBuf);
}

/**
* Reliably broadcast from a server
* @param c server
* @param addresses to send to
* @param message to send
* @param messageSize size of message
*/
void reliableBroadcast(ConnectionlessServer &c, const std::vector<addr_t> &addresses, char *message,
size_t messageSize) {
bestEffortBroadcast(c, addresses, message, messageSize);
}


/**
* Reliably broadcast from a server
* @param clients clients to send to
* @param message message to send
* @param messageSize size of message
*/
void reliableBroadcast(std::vector<ConnectionlessClient> &clients, char *message, size_t messageSize) {
bestEffortBroadcast(clients, message, messageSize);
}

/**
* Receive from
* @param receiveFrom node to receive from
* @param clients clients to send to
* @param buf buffer to use (registered)
* @param bufSize size of buffer to use
* @param checkIfReceivedBefore function to check if the message has been received before
* @param markAsReceived function to mark a message as received
* @return true if it has not been received before
*/
bool reliableBroadcastReceiveFrom(ConnectionlessClient &receiveFrom, std::vector<ConnectionlessClient> &clients,
char *buf,
size_t bufSize, const std::function<bool(char *, size_t)>& checkIfReceivedBefore,
const std::function<void(char *, size_t)>& markAsReceived) {

receiveFrom.recv(buf, bufSize);

if (!checkIfReceivedBefore(buf, bufSize)) {
bestEffortBroadcast(clients, buf, bufSize);
markAsReceived(buf, bufSize);
return true;
}
return false;
}

}


Expand Down

0 comments on commit 9752784

Please sign in to comment.