Skip to content

feat: impl ConnectionManager as thread safe singleton #122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/ImplicitMessagingExample.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ int main() {
auto si = std::make_shared<SessionInfo>("172.28.1.3", 0xAF12);

// Implicit messaging
ConnectionManager connectionManager;
auto& connectionManager = ConnectionManager::getInstance();

ConnectionParameters parameters;
parameters.connectionPath = {0x20, 0x04,0x24, 151, 0x2C, 150, 0x2C, 100}; // config Assm151, output Assm150, intput Assm100
Expand Down
39 changes: 23 additions & 16 deletions src/ConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,11 @@ namespace eipScanner {
FORWARD_CLOSE = 0x4E
};

ConnectionManager::ConnectionManager()
: ConnectionManager(std::make_shared<MessageRouter>()){
}

ConnectionManager::ConnectionManager(const MessageRouter::SPtr& messageRouter)
ConnectionManager::ConnectionManager(MessageRouter::SPtr messageRouter)
: _messageRouter(messageRouter)
, _connectionMap(){

, _connectionMap()
, _socketMap()
{
std::random_device rd;
std::uniform_int_distribution<cip::CipUint> dist(0, std::numeric_limits<cip::CipUint>::max());
_incarnationId = dist(rd);
Expand All @@ -52,6 +49,7 @@ namespace eipScanner {
IOConnection::WPtr
ConnectionManager::forwardOpen(const SessionInfoIf::SPtr& si, ConnectionParameters connectionParameters, bool isLarge) {
static int serialNumberCount = 0;
std::lock_guard<std::mutex> lock(_connectionMapMutex);
connectionParameters.connectionSerialNumber = ++serialNumberCount;

NetworkConnectionParametersBuilder o2tNCP(connectionParameters.o2tNetworkConnectionParams, isLarge);
Expand Down Expand Up @@ -196,6 +194,7 @@ namespace eipScanner {
<< ". But the connection is removed from ConnectionManager anyway";
}

std::lock_guard<std::mutex> lock(_connectionMapMutex);
auto rc = _connectionMap.erase(ptr->_t2oNetworkConnectionId);
(void) rc;
assert(rc);
Expand All @@ -206,15 +205,19 @@ namespace eipScanner {

void ConnectionManager::handleConnections(std::chrono::milliseconds timeout) {
std::vector<BaseSocket::SPtr > sockets;
std::transform(_socketMap.begin(), _socketMap.end(), std::back_inserter(sockets), [](auto entry) {
auto fd = entry.second->getSocketFd();
(void) fd;
return entry.second;
});
{
std::lock_guard<std::mutex> lock(_socketMapMutex);
std::transform(_socketMap.begin(), _socketMap.end(), std::back_inserter(sockets), [](auto entry) {
auto fd = entry.second->getSocketFd();
(void) fd;
return entry.second;
});
}

BaseSocket::select(sockets, timeout);

std::vector<cip::CipUdint> connectionsToClose;
std::lock_guard<std::mutex> lock(_connectionMapMutex);
for (auto& entry : _connectionMap) {
if (!entry.second->notifyTick()) {
connectionsToClose.push_back(entry.first);
Expand All @@ -227,14 +230,11 @@ namespace eipScanner {
}

UDPBoundSocket::SPtr ConnectionManager::findOrCreateSocket(const sockets::EndPoint& endPoint) {
std::lock_guard<std::mutex> lock(_socketMapMutex);
auto socket = _socketMap.find(endPoint);
if (socket == _socketMap.end()) {
auto newSocket = std::make_shared<UDPBoundSocket>(endPoint);
_socketMap[endPoint] = newSocket;
newSocket->setBeginReceiveHandler([](sockets::BaseSocket& sock) {
(void) sock;
Logger(LogLevel::DEBUG) << "Received something";
});

newSocket->setBeginReceiveHandler([this](BaseSocket& sock) {
auto recvData = sock.Receive(8192);
Expand All @@ -247,6 +247,7 @@ namespace eipScanner {
buffer >> connectionId;
Logger(LogLevel::DEBUG) << "Received data from connection T2O_ID=" << connectionId;

std::lock_guard<std::mutex> lock(_connectionMapMutex);
auto io = _connectionMap.find(connectionId);
if (io != _connectionMap.end()) {
io->second->notifyReceiveData(commonPacket.getItems().at(1).getData());
Expand All @@ -262,6 +263,12 @@ namespace eipScanner {
}

bool ConnectionManager::hasOpenConnections() const {
std::lock_guard<std::mutex> lock(_connectionMapMutex);
return !_connectionMap.empty();
}

MessageRouter::SPtr ConnectionManager::getRouter() const {
return _messageRouter;
}

}
32 changes: 25 additions & 7 deletions src/ConnectionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define EIPSCANNER_CONNECTIONMANAGER_H

#include <map>
#include <mutex>
#include "MessageRouter.h"
#include "IOConnection.h"
#include "cip/connectionManager/ConnectionParameters.h"
Expand All @@ -22,15 +23,16 @@ namespace eipScanner {
class ConnectionManager {
public:
/**
* @brief Default constructor
* @brief get singleton instance
*/
ConnectionManager();
static ConnectionManager& getInstance(MessageRouter::SPtr messageRouter = std::make_shared<MessageRouter>())
{
static ConnectionManager instance{messageRouter};
return instance;
}

/**
* @note used fot testing
* @param messageRouter
*/
explicit ConnectionManager(const MessageRouter::SPtr& messageRouter);
ConnectionManager(ConnectionManager const&) = delete;
void operator=(ConnectionManager const&) = delete;

/**
* @brief Default destructor
Expand Down Expand Up @@ -73,9 +75,25 @@ namespace eipScanner {
* @return true if there are some opened IO connections
*/
bool hasOpenConnections() const;

/**
*
* @return messageRouter shard pointer
*/
MessageRouter::SPtr getRouter() const;

private:
/**
* @note used fot testing
* @param messageRouter
*/
ConnectionManager(MessageRouter::SPtr messageRouter = std::make_shared<MessageRouter>());


MessageRouter::SPtr _messageRouter;
mutable std::mutex _connectionMapMutex;
std::map<cip::CipUint, IOConnection::SPtr> _connectionMap;
mutable std::mutex _socketMapMutex;
std::map<sockets::EndPoint, std::shared_ptr<sockets::UDPBoundSocket>> _socketMap;

sockets::UDPBoundSocket::SPtr findOrCreateSocket(const sockets::EndPoint& endPoint);
Expand Down
2 changes: 1 addition & 1 deletion src/IOConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ namespace eipScanner {
auto periodInMicroS = sinceLastHandle.count() * 1000;
_connectionTimeoutCount += periodInMicroS;
if (_connectionTimeoutCount > _connectionTimeoutMultiplier * _t2oAPI) {
Logger(LogLevel::WARNING) << "Connection SeriaNumber=" << _serialNumber << " is closed by timeout";
Logger(LogLevel::WARNING) << "Connection ID "<< _t2oNetworkConnectionId << " SeriaNumber=" << _serialNumber << " is closed by timeout";
_closeHandle();
return false;
}
Expand Down
3 changes: 3 additions & 0 deletions src/sockets/BaseSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ namespace sockets {
}

void BaseSocket::select(std::vector<BaseSocket::SPtr> sockets, std::chrono::milliseconds timeout) {
if (sockets.empty()) {
return;
}
BaseSocket::SPtr socketWithMaxFd = *std::max_element(sockets.begin(), sockets.end(), [](auto sock1, auto sock2) {
return sock1->getSocketFd() < sock2->getSocketFd();
});
Expand Down