Skip to content

Commit 9f5b661

Browse files
author
Alexandru Kampmann
committed
* adding flag to indicate that writer/reader has been initialized
* adding API to check if a reader or writer for specific topic/typename already exists
1 parent 00a8e6b commit 9f5b661

File tree

9 files changed

+116
-5
lines changed

9 files changed

+116
-5
lines changed

include/rtps/entities/Domain.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ namespace rtps{
4747
Writer* createWriter(Participant& part, const char* topicName, const char* typeName, bool reliable);
4848
Reader* createReader(Participant& part, const char* topicName, const char* typeName, bool reliable);
4949

50+
Writer* writerExists(Participant& part, const char* topicName, const char* typeName, bool reliable);
51+
Reader* readerExists(Participant& part, const char* topicName, const char* typeName, bool reliable);
52+
5053
private:
5154
ThreadPool m_threadPool;
5255
UdpDriver m_transport;
@@ -55,8 +58,8 @@ namespace rtps{
5558
ParticipantId_t m_nextParticipantId = PARTICIPANT_START_ID;
5659

5760
std::array<StatelessWriter, Config::NUM_STATELESS_WRITERS> m_statelessWriters;
58-
uint8_t m_numStatelessWriters = 0;
5961
std::array<StatelessReader, Config::NUM_STATELESS_READERS> m_statelessReaders;
62+
uint8_t m_numStatelessWriters = 0;
6063
uint8_t m_numStatelessReaders = 0;
6164
std::array<StatefulReader, Config::NUM_STATEFUL_READERS> m_statefulReaders;
6265
uint8_t m_numStatefulReaders = 0;

include/rtps/entities/Reader.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,11 @@ namespace rtps{
8686
virtual bool onNewHeartbeat(const SubmessageHeartbeat& msg, const GuidPrefix_t& remotePrefix) = 0;
8787
virtual bool addNewMatchedWriter(const WriterProxy& newProxy) = 0;
8888
virtual void removeWriter(const Guid& guid) = 0;
89+
bool isInitialized(){
90+
return m_is_initialized_;
91+
}
8992
protected:
93+
bool m_is_initialized_ = false;
9094
virtual ~Reader() = default;
9195
};
9296
}

include/rtps/entities/StatefulReader.tpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ void StatefulReaderT<NetworkDriver>::init(const TopicData& attributes, NetworkDr
4848
m_transport = &driver;
4949
m_packetInfo.srcPort = attributes.unicastLocator.port;
5050
sys_mutex_new(&m_mutex);
51+
m_is_initialized_ = true;
5152
}
5253

5354
template <class NetworkDriver>

include/rtps/entities/StatefulWriter.tpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ bool StatefulWriterT<NetworkDriver>::init(TopicData attributes, TopicKind_t topi
7878
m_heartbeatThread = sys_thread_new("HBThread", hbFunctionJumppad, this, Config::HEARTBEAT_STACKSIZE, Config::THREAD_POOL_WRITER_PRIO);
7979
}
8080

81+
m_is_initialized_ = true;
8182
return true;
8283
}
8384

include/rtps/entities/StatelessWriter.tpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ bool StatelessWriterT<NetworkDriver>::init(TopicData attributes, TopicKind_t top
6464
mp_threadPool = threadPool;
6565
m_transport = &driver;
6666

67+
m_is_initialized_ = true;
6768
return true;
6869
}
6970

include/rtps/entities/Writer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ namespace rtps{
4545
virtual void setAllChangesToUnsent() = 0;
4646
virtual void onNewAckNack(const SubmessageAckNack& msg, const GuidPrefix_t& sourceGuidPrefix) = 0;
4747

48+
bool isInitialized(){
49+
return m_is_initialized_;
50+
}
51+
4852
protected:
53+
bool m_is_initialized_ = false;
4954
virtual ~Writer() = default;
5055
};
5156
}

src/discovery/SPDPAgent.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ bool SPDPAgent::addProxiesForBuiltInEndpoints(){
188188
return false;
189189
}
190190

191+
ip4_addr_t ip4addr = locator->getIp4Address();
192+
const char* addr = ip4addr_ntoa(&ip4addr);
193+
printf("Adding IPv4 Locator %s", addr);
194+
191195

192196
if (m_proxyDataBuffer.hasPublicationWriter()){
193197
const WriterProxy proxy{{m_proxyDataBuffer.m_guid.prefix, ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER}, *locator};

src/entities/Domain.cpp

Lines changed: 95 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,95 @@ void Domain::registerPort(const Participant& part){
172172
m_transport.createUdpConnection(getBuiltInUnicastPort(part.m_participantId));
173173
}
174174

175+
rtps::Reader* Domain::readerExists(Participant& part, const char* topicName, const char* typeName, bool reliable){
176+
if(reliable){
177+
for(unsigned int i = 0; i < m_numStatefulReaders; i++){
178+
if(m_statefulReaders[i].isInitialized()){
179+
if(strncmp(m_statefulReaders[i].m_attributes.topicName, topicName, Config::MAX_TYPENAME_LENGTH) != 0){
180+
continue;
181+
}
182+
183+
if(strncmp(m_statefulReaders[i].m_attributes.typeName, typeName, Config::MAX_TYPENAME_LENGTH) != 0){
184+
continue;
185+
}
186+
187+
#if DOMAIN_VERBOSE
188+
printf("StatefulReader exists already [%s, %s]\n", topicName, typeName);
189+
#endif
190+
191+
return &m_statefulReaders[i];
192+
}
193+
}
194+
}else{
195+
for(unsigned int i = 0; i < m_numStatelessReaders; i++){
196+
if(m_statelessReaders[i].isInitialized()){
197+
if(strncmp(m_statelessReaders[i].m_attributes.topicName, topicName, Config::MAX_TYPENAME_LENGTH) != 0){
198+
continue;
199+
}
200+
201+
if(strncmp(m_statelessReaders[i].m_attributes.typeName, typeName, Config::MAX_TYPENAME_LENGTH) != 0){
202+
continue;
203+
}
204+
205+
#if DOMAIN_VERBOSE
206+
printf("StatelessReader exists [%s, %s]\n", topicName, typeName);
207+
#endif
208+
return &m_statelessReaders[i];
209+
}
210+
}
211+
}
212+
213+
return nullptr;
214+
}
215+
216+
217+
rtps::Writer* Domain::writerExists(Participant& part, const char* topicName, const char* typeName, bool reliable){
218+
if(reliable){
219+
for(unsigned int i = 0; i < m_numStatefulWriters; i++){
220+
if(m_statefulWriters[i].isInitialized()){
221+
if(strncmp(m_statefulWriters[i].m_attributes.topicName, topicName, Config::MAX_TYPENAME_LENGTH) != 0){
222+
continue;
223+
}
224+
225+
if(strncmp(m_statefulWriters[i].m_attributes.typeName, typeName, Config::MAX_TYPENAME_LENGTH) != 0){
226+
continue;
227+
}
228+
229+
#if DOMAIN_VERBOSE
230+
printf("StatefulWriter exists [%s, %s]\n", topicName, typeName);
231+
#endif
232+
233+
return &m_statefulWriters[i];
234+
}
235+
}
236+
}else{
237+
for(unsigned int i = 0; i < m_numStatelessWriters; i++){
238+
if(m_statelessWriters[i].isInitialized()){
239+
if(strncmp(m_statelessWriters[i].m_attributes.topicName, topicName, Config::MAX_TYPENAME_LENGTH) != 0){
240+
continue;
241+
}
242+
243+
if(strncmp(m_statelessWriters[i].m_attributes.typeName, typeName, Config::MAX_TYPENAME_LENGTH) != 0){
244+
continue;
245+
}
246+
247+
#if DOMAIN_VERBOSE
248+
printf("StatelessWriter exists [%s, %s]\n", topicName, typeName);
249+
#endif
250+
return &m_statelessWriters[i];
251+
}
252+
}
253+
}
254+
255+
return nullptr;
256+
}
257+
175258
rtps::Writer* Domain::createWriter(Participant& part, const char* topicName, const char* typeName, bool reliable){
259+
#if DOMAIN_VERBOSE
260+
printf("Creating writer[%s, %s]\n", topicName, typeName);
261+
#endif
262+
263+
// Check if there is enough capacity for more writers
176264
if((reliable && m_statefulWriters.size() <= m_numStatefulWriters) ||
177265
(!reliable && m_statelessWriters.size() <= m_numStatelessWriters) ||
178266
part.isWritersFull()){
@@ -215,10 +303,13 @@ rtps::Writer* Domain::createWriter(Participant& part, const char* topicName, con
215303

216304

217305
rtps::Reader* Domain::createReader(Participant& part, const char* topicName, const char* typeName, bool reliable){
218-
if((reliable && m_statefulReaders.size() <= m_numStatefulReaders) ||
219-
(!reliable && m_statelessReaders.size() <= m_numStatelessReaders) ||
220-
part.isReadersFull()){
221-
return nullptr;
306+
#if DOMAIN_VERBOSE
307+
printf("Creating reader[%s, %s]\n", topicName, typeName);
308+
#endif
309+
if((reliable && m_statefulReaders.size() <= m_numStatefulReaders) ||
310+
(!reliable && m_statelessReaders.size() <= m_numStatelessReaders) ||
311+
part.isReadersFull()){
312+
return nullptr;
222313
}
223314

224315
// TODO Distinguish WithKey and NoKey (Also changes EntityKind)

src/entities/StatelessReader.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ using rtps::StatelessReader;
3131

3232
void StatelessReader::init(const TopicData& attributes){
3333
m_attributes = attributes;
34+
m_is_initialized_ = true;
3435
}
3536

3637
void StatelessReader::newChange(const ReaderCacheChange& cacheChange){

0 commit comments

Comments
 (0)