@@ -28,7 +28,7 @@ namespace catapult { namespace net {
28
28
ionet::Node Node;
29
29
WeakSocketPointer pSocketWeak;
30
30
std::shared_ptr<ionet::PacketIo> pBufferedIo;
31
- std::shared_ptr <ChainedSocketReader> pReader;
31
+ std::weak_ptr <ChainedSocketReader> pReader;
32
32
};
33
33
34
34
// expected sequences
@@ -38,6 +38,7 @@ namespace catapult { namespace net {
38
38
class WriterContainer {
39
39
private:
40
40
using ReaderWriterContainer = std::unordered_map<Key, ReaderWriterState, utils::ArrayHasher<Key>>;
41
+ using ChainedSocketReaderFactory = std::function<std::shared_ptr<ChainedSocketReader> (const SocketPointer&, const std::shared_ptr<ionet::PacketIo>&)>;
41
42
42
43
public:
43
44
size_t size () const {
@@ -97,32 +98,42 @@ namespace catapult { namespace net {
97
98
m_connectingNodeIdentityKeys.erase (node.identityKey ());
98
99
}
99
100
100
- bool insert (const ReaderWriterState& state ) {
101
+ bool insert (net::ConnectionType connectionType, ionet::Node node, const SocketPointer& pSocket, const ChainedSocketReaderFactory& readerFactory ) {
101
102
std::unique_lock guard (m_mutex);
102
103
103
- const auto & identityKey = state.Node .identityKey ();
104
- switch (state.ConnectionType ) {
104
+ const auto & identityKey = node.identityKey ();
105
+ ReaderWriterState* pState = nullptr ;
106
+ switch (connectionType) {
105
107
case ConnectionType::Connected: {
106
108
if (!m_connectedNodeIdentityKeys.emplace (identityKey).second ) {
107
- CATAPULT_LOG (debug) << " ignoring connection to already connected peer " << state. Node << " " << identityKey;
109
+ CATAPULT_LOG (debug) << " ignoring connection to already connected peer " << node << " " << identityKey;
108
110
return false ;
109
111
}
110
112
m_connectingNodeIdentityKeys.erase (identityKey);
111
- m_connectedWriters[identityKey] = state ;
113
+ pState = & m_connectedWriters[identityKey];
112
114
break ;
113
115
}
114
116
case ConnectionType::Accepted: {
115
117
if (!m_acceptedNodeIdentityKeys.emplace (identityKey).second ) {
116
- CATAPULT_LOG (debug) << " not accepting already connected peer " << state. Node << " " << identityKey;
118
+ CATAPULT_LOG (debug) << " not accepting already connected peer " << node << " " << identityKey;
117
119
return false ;
118
120
}
119
- m_acceptedWriters[identityKey] = state ;
121
+ pState = & m_acceptedWriters[identityKey];
120
122
break ;
121
123
}
122
124
}
123
125
124
126
m_nodeIdentityKeys.emplace (identityKey);
125
- state.pReader ->start ();
127
+
128
+ pState->ConnectionType = connectionType;
129
+ pState->Node = node;
130
+ pState->pSocketWeak = pSocket;
131
+ pState->pBufferedIo = pSocket->buffered ();
132
+ // the reader takes ownership of the socket
133
+ auto pReader = readerFactory (pSocket, pState->pBufferedIo );
134
+ pReader->start ();
135
+ pState->pReader = pReader;
136
+
126
137
return true ;
127
138
}
128
139
@@ -320,15 +331,11 @@ namespace catapult { namespace net {
320
331
}
321
332
322
333
bool addWriter (const ionet::Node& node, const ionet::SslPacketSocketInfo& socketInfo, ConnectionType connectionType) {
323
- ReaderWriterState state;
324
- state.ConnectionType = connectionType;
325
- state.Node = node;
326
334
const auto & pSocket = socketInfo.socket ();
327
- state.pSocketWeak = pSocket;
328
- state.pBufferedIo = pSocket->buffered ();
329
335
auto identity = ionet::ReaderIdentity{ node.identityKey (), socketInfo.host () };
330
- state.pReader = createReader (pSocket, state.pBufferedIo , identity, connectionType);
331
- return m_writers.insert (state);
336
+ return m_writers.insert (connectionType, node, pSocket, [this , identity, connectionType](const auto & pSocket, const auto & pBufferedIo) {
337
+ return this ->createReader (pSocket, pBufferedIo, identity, connectionType);
338
+ });
332
339
}
333
340
334
341
void removeWriter (const WeakSocketPointer& pSocketWeak, const Key& identityKey, ConnectionType connectionType) {
0 commit comments