Skip to content

Commit 4f49cf9

Browse files
akampmannAlexandru Kampmann
andauthored
Feature/endpoint deletion (#17)
* squashed commits from endpoint deletion, multiple callback features * updating r5 config Co-authored-by: Alexandru Kampmann <kampmann@embedded.rwth-aachen.de>
1 parent 1410a87 commit 4f49cf9

38 files changed

+1929
-725
lines changed

include/rtps/common/types.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,10 @@ struct SequenceNumber_t {
158158
return high < other.high || (high == other.high && low < other.low);
159159
}
160160

161+
bool operator>(const SequenceNumber_t &other) const {
162+
return high > other.high || (high == other.high && low > other.low);
163+
}
164+
161165
bool operator<=(const SequenceNumber_t &other) const {
162166
return *this == other || *this < other;
163167
}
@@ -177,7 +181,11 @@ struct SequenceNumber_t {
177181
}
178182
};
179183

180-
const uint32_t SNS_NUM_BITS = 32;
184+
#define SNS_MAX_NUM_BITS 32
185+
#define SNS_NUM_BYTES (SNS_MAX_NUM_BITS / 8)
186+
static_assert(!(SNS_MAX_NUM_BITS % 32) && SNS_MAX_NUM_BITS != 0,
187+
"SNS_MAX_NUM_BITS must be multiple of 32");
188+
181189
struct SequenceNumberSet {
182190

183191
SequenceNumberSet() = default;
@@ -186,12 +194,12 @@ struct SequenceNumberSet {
186194

187195
SequenceNumber_t base = {0, 0};
188196
// Cannot be static because of packed
189-
uint32_t numBits = SNS_NUM_BITS;
190-
std::array<uint32_t, 1> bitMap{};
197+
uint32_t numBits = 0;
198+
std::array<uint32_t, (SNS_MAX_NUM_BITS / 32)> bitMap{};
191199

192200
// We only need 1 byte because atm we don't store packets.
193201
bool isSet(uint32_t bit) const {
194-
if (bit >= SNS_NUM_BITS) {
202+
if (bit >= SNS_MAX_NUM_BITS) {
195203
return true;
196204
}
197205
const auto bucket = static_cast<uint8_t>(bit / 32);

include/rtps/config_r5.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ const uint8_t NUM_READER_PROXIES_PER_WRITER = 60;
2929
const uint8_t MAX_NUM_UNMATCHED_REMOTE_WRITERS = 150;
3030
const uint8_t MAX_NUM_UNMATCHED_REMOTE_READERS = 150;
3131

32+
const uint8_t MAX_NUM_READER_CALLBACKS = 5;
33+
3234
const uint8_t HISTORY_SIZE_STATELESS = 64;
3335
const uint8_t HISTORY_SIZE_STATEFUL = 64;
3436

@@ -37,7 +39,7 @@ const uint8_t MAX_TOPICNAME_LENGTH = 64;
3739

3840
const int HEARTBEAT_STACKSIZE = 1200; // byte
3941
const int THREAD_POOL_WRITER_STACKSIZE = 1100; // byte
40-
const int THREAD_POOL_READER_STACKSIZE = 1600; // byte
42+
const int THREAD_POOL_READER_STACKSIZE = 3600; // byte
4143
const uint16_t SPDP_WRITER_STACKSIZE = 550; // byte
4244

4345
const uint16_t SF_WRITER_HB_PERIOD_MS = 4000;
@@ -48,8 +50,8 @@ const uint8_t SPDP_CYCLECOUNT_HEARTBEAT =
4850
const uint8_t SPDP_MAX_NUMBER_FOUND_PARTICIPANTS = 100;
4951
const uint8_t SPDP_MAX_NUM_LOCATORS = 1;
5052
const Duration_t SPDP_DEFAULT_REMOTE_LEASE_DURATION = {
51-
100, 0}; // Default lease duration for remote participants, usually
52-
// overwritten by remote info
53+
5, 0}; // Default lease duration for remote participants, usually
54+
// overwritten by remote info
5355
const Duration_t SPDP_MAX_REMOTE_LEASE_DURATION = {
5456
180,
5557
0}; // Absolute maximum lease duration, ignoring remote participant info

include/rtps/discovery/SEDPAgent.h

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,22 @@ class SEDPAgent {
4040
void init(Participant &part, const BuiltInEndpoints &endpoints);
4141
void addWriter(Writer &writer);
4242
void addReader(Reader &reader);
43+
bool deleteReader(Reader *reader);
44+
bool deleteWriter(Writer *reader);
45+
4346
void registerOnNewPublisherMatchedCallback(void (*callback)(void *arg),
4447
void *args);
4548
void registerOnNewSubscriberMatchedCallback(void (*callback)(void *arg),
4649
void *args);
4750
void removeUnmatchedEntitiesOfParticipant(const GuidPrefix_t &guidPrefix);
51+
void removeUnmatchedEntity(const Guid_t &guid);
52+
4853
uint32_t getNumRemoteUnmatchedReaders();
4954
uint32_t getNumRemoteUnmatchedWriters();
5055

5156
protected: // For testing purposes
52-
void onNewPublisher(const TopicData &writerData);
53-
void onNewSubscriber(const TopicData &writerData);
57+
void handlePublisherReaderMessage(const TopicData &writerData);
58+
void handleSubscriptionReaderMessage(const TopicData &writerData);
5459

5560
private:
5661
Participant *m_part;
@@ -74,18 +79,38 @@ class SEDPAgent {
7479
void tryMatchUnmatchedEndpoints();
7580
void addUnmatchedRemoteWriter(const TopicData &writerData);
7681
void addUnmatchedRemoteReader(const TopicData &readerData);
82+
void addUnmatchedRemoteWriter(const TopicDataCompressed &writerData);
83+
void addUnmatchedRemoteReader(const TopicDataCompressed &readerData);
84+
85+
void handleRemoteEndpointDeletion(const TopicData &topic);
7786

7887
void (*mfp_onNewPublisherCallback)(void *arg) = nullptr;
7988
void *m_onNewPublisherArgs = nullptr;
8089
void (*mfp_onNewSubscriberCallback)(void *arg) = nullptr;
8190
void *m_onNewSubscriberArgs = nullptr;
8291

83-
static void receiveCallbackPublisher(void *callee,
84-
const ReaderCacheChange &cacheChange);
85-
static void receiveCallbackSubscriber(void *callee,
92+
static void jumppadPublisherReader(void *callee,
93+
const ReaderCacheChange &cacheChange);
94+
static void jumppadSubscriptionReader(void *callee,
8695
const ReaderCacheChange &cacheChange);
87-
void onNewPublisher(const ReaderCacheChange &change);
88-
void onNewSubscriber(const ReaderCacheChange &change);
96+
97+
static void jumppadTakeProxyOfDisposedReader(const Reader *reader,
98+
const WriterProxy &proxy,
99+
void *arg);
100+
static void jumppadTakeProxyOfDisposedWriter(const Writer *writer,
101+
const ReaderProxy &proxy,
102+
void *arg);
103+
104+
void handlePublisherReaderMessage(const ReaderCacheChange &change);
105+
void handleSubscriptionReaderMessage(const ReaderCacheChange &change);
106+
107+
template <typename A> bool deleteEndpoint(A *endpoint, Writer *sedp_endpoint);
108+
109+
template <typename A>
110+
bool announceEndpointDeletion(A *local_endpoint, Writer *sedp_endpoint);
111+
112+
template <typename A>
113+
bool disposeEndpointInSEDPHistory(A *local_endpoint, Writer *sedp_writer);
89114
};
90115
} // namespace rtps
91116

include/rtps/discovery/TopicData.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ struct TopicData {
4848
FullLengthLocator unicastLocator;
4949
FullLengthLocator multicastLocator;
5050

51+
uint8_t statusInfo;
52+
bool statusInfoValid;
53+
// Use Case: Remotes communicates id of deleted endpoint through key_hash
54+
// parameter
55+
EntityId_t entityIdFromKeyHash;
56+
bool entityIdFromKeyHashValid;
57+
5158
TopicData()
5259
: endpointGuid(GUID_UNKNOWN), typeName{'\0'}, topicName{'\0'},
5360
reliabilityKind(ReliabilityKind_t::BEST_EFFORT),
@@ -69,14 +76,16 @@ struct TopicData {
6976

7077
bool readFromUcdrBuffer(ucdrBuffer &buffer);
7178
bool serializeIntoUcdrBuffer(ucdrBuffer &buffer) const;
79+
80+
bool isDisposedFlagSet() const;
81+
bool isUnregisteredFlagSet() const;
7282
};
7383

7484
struct TopicDataCompressed {
7585
Guid_t endpointGuid;
7686
std::size_t topicHash;
7787
std::size_t typeHash;
78-
ReliabilityKind_t reliabilityKind;
79-
DurabilityKind_t durabilityKind;
88+
bool is_reliable;
8089
LocatorIPv4 unicastLocator;
8190
LocatorIPv4 multicastLocator;
8291

@@ -86,8 +95,9 @@ struct TopicDataCompressed {
8695
topicHash =
8796
hashCharArray(topic_data.topicName, Config::MAX_TOPICNAME_LENGTH);
8897
typeHash = hashCharArray(topic_data.typeName, Config::MAX_TYPENAME_LENGTH);
89-
reliabilityKind = topic_data.reliabilityKind;
90-
durabilityKind = topic_data.durabilityKind;
98+
is_reliable = (topic_data.reliabilityKind == ReliabilityKind_t::RELIABLE)
99+
? true
100+
: false;
91101
unicastLocator = topic_data.unicastLocator;
92102
multicastLocator = topic_data.multicastLocator;
93103
}

include/rtps/entities/Domain.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ class Domain {
5757
Reader *readerExists(Participant &part, const char *topicName,
5858
const char *typeName, bool reliable);
5959

60+
bool deleteWriter(Participant &part, Writer *writer);
61+
bool deleteReader(Participant &part, Reader *reader);
62+
63+
void printInfo();
64+
6065
private:
6166
friend class SizeInspector;
6267
ThreadPool m_threadPool;
@@ -67,14 +72,19 @@ class Domain {
6772

6873
std::array<StatelessWriter, Config::NUM_STATELESS_WRITERS> m_statelessWriters;
6974
std::array<StatelessReader, Config::NUM_STATELESS_READERS> m_statelessReaders;
70-
uint8_t m_numStatelessWriters = 0;
71-
uint8_t m_numStatelessReaders = 0;
7275
std::array<StatefulReader, Config::NUM_STATEFUL_READERS> m_statefulReaders;
73-
uint8_t m_numStatefulReaders = 0;
7476
std::array<StatefulWriter, Config::NUM_STATEFUL_WRITERS> m_statefulWriters;
75-
uint8_t m_numStatefulWriters = 0;
77+
template <typename A, typename B> B *getNextUnusedEndpoint(A &a) {
78+
for (unsigned int i = 0; i < a.size(); i++) {
79+
if (!a[i].isInitialized()) {
80+
return &(a[i]);
81+
}
82+
}
83+
return nullptr;
84+
}
7685

7786
bool m_initComplete = false;
87+
sys_mutex_t m_mutex;
7888

7989
void receiveCallback(const PacketInfo &packet);
8090
GuidPrefix_t generateGuidPrefix(ParticipantId_t id) const;

include/rtps/entities/Participant.h

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,30 +68,33 @@ class Participant {
6868
//! Not-thread-safe function to add a writer
6969
Writer *addWriter(Writer *writer);
7070
bool isWritersFull();
71+
bool deleteWriter(Writer *writer);
7172

7273
//! Not-thread-safe function to add a reader
7374
Reader *addReader(Reader *reader);
7475
bool isReadersFull();
76+
bool deleteReader(Reader *reader);
7577

7678
//! (Probably) Thread safe if writers cannot be removed
77-
Writer *getWriter(EntityId_t id) const;
78-
Writer *getMatchingWriter(const TopicData &topicData) const;
79-
Writer *getMatchingWriter(const TopicDataCompressed &topicData) const;
79+
Writer *getWriter(EntityId_t id);
80+
Writer *getMatchingWriter(const TopicData &topicData);
81+
Writer *getMatchingWriter(const TopicDataCompressed &topicData);
8082

8183
//! (Probably) Thread safe if readers cannot be removed
82-
Reader *getReader(EntityId_t id) const;
83-
Reader *getReaderByWriterId(const Guid_t &guid) const;
84-
Reader *getMatchingReader(const TopicData &topicData) const;
85-
Reader *getMatchingReader(const TopicDataCompressed &topicData) const;
84+
Reader *getReader(EntityId_t id);
85+
Reader *getReaderByWriterId(const Guid_t &guid);
86+
Reader *getMatchingReader(const TopicData &topicData);
87+
Reader *getMatchingReader(const TopicDataCompressed &topicData);
8688

8789
bool addNewRemoteParticipant(const ParticipantProxyData &remotePart);
8890
bool removeRemoteParticipant(const GuidPrefix_t &prefix);
89-
void removeAllEntitiesOfParticipant(const GuidPrefix_t &prefix);
91+
void removeAllProxiesOfParticipant(const GuidPrefix_t &prefix);
92+
void removeProxyFromAllEndpoints(const Guid_t &guid);
93+
9094
const ParticipantProxyData *findRemoteParticipant(const GuidPrefix_t &prefix);
9195
void refreshRemoteParticipantLiveliness(const GuidPrefix_t &prefix);
9296
uint32_t getRemoteParticipantCount();
9397
MessageReceiver *getMessageReceiver();
94-
void addHeartbeat(GuidPrefix_t sourceGuidPrefix);
9598
bool checkAndResetHeartbeats();
9699

97100
bool hasReaderWithMulticastLocator(ip4_addr_t address);
@@ -100,16 +103,17 @@ class Participant {
100103
void newMessage(const uint8_t *data, DataSize_t size);
101104

102105
SPDPAgent &getSPDPAgent();
106+
void printInfo();
103107

104108
private:
105109
friend class SizeInspector;
106110
MessageReceiver m_receiver;
107111
bool m_hasBuilInEndpoints = false;
108112
std::array<uint8_t, 3> m_nextUserEntityId{{0, 0, 1}};
109-
std::array<Writer *, Config::NUM_WRITERS_PER_PARTICIPANT> m_writers{};
110-
uint8_t m_numWriters = 0;
111-
std::array<Reader *, Config::NUM_READERS_PER_PARTICIPANT> m_readers{};
112-
uint8_t m_numReaders = 0;
113+
std::array<Writer *, Config::NUM_WRITERS_PER_PARTICIPANT> m_writers = {
114+
nullptr};
115+
std::array<Reader *, Config::NUM_READERS_PER_PARTICIPANT> m_readers = {
116+
nullptr};
113117

114118
sys_mutex_t m_mutex;
115119
MemoryPool<ParticipantProxyData, Config::SPDP_MAX_NUMBER_FOUND_PARTICIPANTS>

include/rtps/entities/Reader.h

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ Author: i11 - Embedded Software, RWTH Aachen University
3636
namespace rtps {
3737

3838
struct SubmessageHeartbeat;
39+
struct SubmessageGap;
3940

4041
class ReaderCacheChange {
4142
private:
@@ -79,31 +80,65 @@ typedef void (*ddsReaderCallback_fp)(void *callee,
7980

8081
class Reader {
8182
public:
83+
using callbackFunction_t = void (*)(void *, const ReaderCacheChange &);
84+
using callbackIdentifier_t = uint32_t;
85+
8286
TopicData m_attributes;
8387
virtual void newChange(const ReaderCacheChange &cacheChange) = 0;
84-
virtual void registerCallback(ddsReaderCallback_fp cb, void *callee) = 0;
88+
virtual callbackIdentifier_t registerCallback(callbackFunction_t cb,
89+
void *arg);
90+
virtual bool removeCallback(callbackIdentifier_t identifier);
91+
uint8_t getNumCallbacks();
92+
8593
virtual bool onNewHeartbeat(const SubmessageHeartbeat &msg,
8694
const GuidPrefix_t &remotePrefix) = 0;
95+
virtual bool onNewGapMessage(const SubmessageGap &msg,
96+
const GuidPrefix_t &remotePrefix) = 0;
8797
virtual bool addNewMatchedWriter(const WriterProxy &newProxy) = 0;
88-
virtual void removeWriter(const Guid_t &guid) = 0;
89-
virtual void removeWriterOfParticipant(const GuidPrefix_t &guidPrefix) = 0;
98+
virtual bool removeProxy(const Guid_t &guid);
99+
virtual void removeAllProxiesOfParticipant(const GuidPrefix_t &guidPrefix);
90100
bool isInitialized() { return m_is_initialized_; }
101+
virtual void reset();
102+
bool isProxy(const Guid_t &guid);
103+
WriterProxy *getProxy(Guid_t guid);
104+
uint32_t getProxiesCount();
91105

92-
bool knowWriterId(const Guid_t &guid) {
93-
for (const auto &proxy : m_proxies) {
94-
if (proxy.remoteWriterGuid.operator==(guid)) {
95-
return true;
96-
}
97-
}
98-
return false;
99-
}
106+
void setSEDPSequenceNumber(const SequenceNumber_t &sn);
107+
const SequenceNumber_t &getSEDPSequenceNumber();
108+
109+
using dumpProxyCallback = void (*)(const Reader *reader, const WriterProxy &,
110+
void *arg);
100111

101-
uint32_t getNumMatchedWriters() { return m_proxies.getSize(); }
112+
//! Dangerous, only
113+
int dumpAllProxies(dumpProxyCallback target, void *arg);
102114

103115
protected:
116+
void executeCallbacks(const ReaderCacheChange &cacheChange);
117+
bool initMutex();
118+
119+
SequenceNumber_t m_sedp_sequence_number;
120+
104121
bool m_is_initialized_ = false;
105122
virtual ~Reader() = default;
106123
MemoryPool<WriterProxy, Config::NUM_WRITER_PROXIES_PER_READER> m_proxies;
124+
125+
callbackIdentifier_t m_callback_identifier = 1;
126+
127+
uint8_t m_callback_count = 0;
128+
using callbackElement_t = struct {
129+
callbackFunction_t function = nullptr;
130+
void *arg = nullptr;
131+
callbackIdentifier_t identifier;
132+
};
133+
134+
std::array<callbackElement_t, Config::MAX_NUM_READER_CALLBACKS> m_callbacks =
135+
{nullptr};
136+
137+
// Guards manipulation of the proxies array
138+
sys_mutex_t m_proxies_mutex = nullptr;
139+
140+
// Guards manipulation of callback array
141+
sys_mutex_t m_callback_mutex = nullptr;
107142
};
108143
} // namespace rtps
109144

0 commit comments

Comments
 (0)