Skip to content

Commit

Permalink
Fix for liveliness changed struct (#578)
Browse files Browse the repository at this point in the history
* Refs #5765 Adding a test

* Refs #5765 Fix for liveliness changed status

* Refs #5765 Adding more tests

* Refs #5765 Addressing some review comments

* Refs #5765 Adding indentation
  • Loading branch information
raquelalvarezbanos authored and MiguelCompany committed Jun 28, 2019
1 parent 381c2d3 commit 17fee82
Show file tree
Hide file tree
Showing 11 changed files with 475 additions and 238 deletions.
8 changes: 4 additions & 4 deletions include/fastrtps/qos/LivelinessChangedStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ struct LivelinessChangedStatus
//! or when a publisher previously considered to be not alive reasserts its liveliness. The count decreases
//! when a publisher considered alive fails to assert its liveliness and becomes not alive, whether because
//! it was deleted normally or for some other reason
uint32_t alive_count = 0;
int32_t alive_count = 0;

//! @brief The total count of current publishers that write the topic read by the subscriber that are no longer
//! asserting their liveliness
//! @details This count increases when a publisher considered alive fails to assert its liveliness and becomes
//! not alive for some reason other than the normal deletion of that publisher. It decreases when a previously
//! not alive publisher either reasserts its liveliness or is deleted normally
uint32_t not_alive_count = 0;
int32_t not_alive_count = 0;

//! @brief The change in the alive_count since the last time the listener was called or the status was read
uint32_t alive_count_change = 0;
int32_t alive_count_change = 0;

//! @brief The change in the not_alive_count since the last time the listener was called or the status was read
uint32_t not_alive_count_change = 0;
int32_t not_alive_count_change = 0;

//! @brief Handle to the last publisher whose change in liveliness caused this status to change
rtps::InstanceHandle_t last_publication_handle;
Expand Down
38 changes: 18 additions & 20 deletions include/fastrtps/rtps/builtin/liveliness/WLP.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,49 +205,47 @@ class WLP
LivelinessManager* sub_liveliness_manager_;

/**
* @brief A method invoked by pub_liveliness_manager_ to inform that a writer lost liveliness
* @brief A method invoked by pub_liveliness_manager_ to inform that a writer changed its liveliness
* @param writer The writer losing liveliness
* @param kind The liveliness kind
* @param lease_duration The liveliness lease duration
* @param alive_change The change in the alive count
* @param not_alive_change The change in the not alive count
*/
void pub_liveliness_lost(
void pub_liveliness_changed(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration);
const Duration_t& lease_duration,
int32_t alive_change,
int32_t not_alive_change);

/**
* @brief A method invoked by sub_liveliness_manager_ to inform that a writer lost liveliness
* @brief A method invoked by sub_liveliness_manager_ to inform that a writer changed its liveliness
* @param writer The writer losing liveliness
* @param kind The liveliness kind of the writer losing liveliness
* @param lease_duration The liveliness lease duration of the writer losing liveliness
* @param alive_change The change in the alive count
* @param not_alive_change The change in the not alive count
*/
void sub_liveliness_lost(
void sub_liveliness_changed(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration);

/**
* @brief A method invoked by sub_liveliness_manager_ to inform that a writer recovered liveliness
* @param writer The writer recovering liveliness
* @param kind The liveliness kind of the writer recovering liveliness
* @param lease_duration The liveliness lease duration of the writer recovering liveliness
*/
void sub_liveliness_recovered(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration);

const Duration_t& lease_duration,
int32_t alive_change,
int32_t not_alive_change);

/**
* @brief A method to update the liveliness changed status of a given reader
* @param writer The writer changing liveliness, specified by its guid
* @param reader The reader whose liveliness needs to be updated
* @param lost True to indicate that liveliness of the writer was lost. False to indicate it was recovered
* @param alive_change The change requested for alive count. Should be -1, 0 or +1
* @param not_alive_change The change requested for not alive count. Should be -1, 0 or +1
*/
void update_liveliness_changed_status(
GUID_t writer,
RTPSReader* reader,
bool lost);
int32_t alive_change,
int32_t not_alive_change);

#if HAVE_SECURITY
//!Pointer to the builtinRTPSParticipantMEssageWriter.
Expand Down
16 changes: 14 additions & 2 deletions include/fastrtps/rtps/writer/LivelinessData.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ namespace rtps {
*/
struct LivelinessData
{
enum WriterStatus
{
//! Writer is matched but liveliness has not been asserted yet
NOT_ASSERTED = 0,
//! Writer is alive
ALIVE = 1,
//! Writer is not alive
NOT_ALIVE = 2
};

/**
* @brief Constructor
* @param guid_in GUID of the writer
Expand All @@ -46,12 +56,14 @@ struct LivelinessData
: guid(guid_in)
, kind(kind_in)
, lease_duration(lease_duration_in)
, status(WriterStatus::NOT_ASSERTED)
{}

LivelinessData()
: guid()
, kind(AUTOMATIC_LIVELINESS_QOS)
, lease_duration(c_TimeInfinite)
, status(WriterStatus::NOT_ASSERTED)
{}

~LivelinessData()
Expand Down Expand Up @@ -91,8 +103,8 @@ struct LivelinessData
//! The number of times the writer is being counted
unsigned int count = 1;

//! True if the writer is alive, false otherwise
bool alive = false;
//! The writer status
WriterStatus status;

//! The time when the writer will lose liveliness
std::chrono::steady_clock::time_point time;
Expand Down
37 changes: 16 additions & 21 deletions include/fastrtps/rtps/writer/LivelinessManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

using LivelinessCallback = std::function<void(
const GUID_t&,
const LivelinessQosPolicyKind&,
const Duration_t&,
int32_t alive_change,
int32_t not_alive_change)>;

/**
* @brief A class managing the liveliness of a set of writers. Writers are represented by their LivelinessData
* @details Uses a shared timed event and informs outside classes on liveliness changes
Expand All @@ -39,21 +46,13 @@ class LivelinessManager

/**
* @brief Constructor
* @param liveliness_lost_callback A callback that will be invoked when a writer loses liveliness
* @param liveliness_recovered_callback A callback that will be invoked when a writer recovers liveliness
* @param callback A callback that will be invoked when a writer changes its liveliness status
* @param service The asio I/O service
* @param event_thread The event thread
* @param manage_automatic True to manage writers with automatic liveliness, false otherwise
*/
LivelinessManager(
const std::function<void(
const GUID_t&,
const LivelinessQosPolicyKind&,
const Duration_t&)>& liveliness_lost_callback,
const std::function<void(
const GUID_t&,
const LivelinessQosPolicyKind&,
const Duration_t&)>& liveliness_recovered_callback,
const LivelinessCallback& callback,
asio::io_service& service,
const std::thread& event_thread,
bool manage_automatic = true);
Expand Down Expand Up @@ -128,6 +127,11 @@ class LivelinessManager

private:

//! @brief A method responsible for invoking the callback when liveliness is asserted
//! @param writer The liveliness data of the writer asserting liveliness
//!
void assert_writer_liveliness(LivelinessData& writer);

/**
* @brief A method to calculate the time when the next writer is going to lose liveliness
* @details This method is public for testing purposes but it should not be used from outside this class
Expand All @@ -151,17 +155,8 @@ class LivelinessManager
//! A method called if the timer expires
void timer_expired();

//! A callback to inform outside classes that a writer lost its liveliness
std::function<void(
const GUID_t&,
const LivelinessQosPolicyKind&,
const Duration_t&)> liveliness_lost_callback_;

//! A callback to inform outside classes that a writer recovered its liveliness
std::function<void(
const GUID_t&,
const LivelinessQosPolicyKind&,
const Duration_t&)> liveliness_recovered_callback_;
//! A callback to inform outside classes that a writer changed its liveliness status
LivelinessCallback callback_;

//! A boolean indicating whether we are managing writers with automatic liveliness
bool manage_automatic_;
Expand Down
95 changes: 47 additions & 48 deletions src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,34 @@ bool WLP::initWL(RTPSParticipantImpl* p)
pub_liveliness_manager_ = new LivelinessManager(
[&](const GUID_t& guid,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration) -> void
const Duration_t& lease_duration,
int alive_count,
int not_alive_count) -> void
{
pub_liveliness_lost(guid, kind, lease_duration);
pub_liveliness_changed(
guid,
kind,
lease_duration,
alive_count,
not_alive_count);
},
nullptr,
mp_participant->getEventResource().getIOService(),
mp_participant->getEventResource().getThread(),
false);

sub_liveliness_manager_ = new LivelinessManager(
[&](const GUID_t& guid,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration) -> void
{
sub_liveliness_lost(guid, kind, lease_duration);
},
[&](const GUID_t& guid, const LivelinessQosPolicyKind& kind, const Duration_t& lease_duration) -> void
const Duration_t& lease_duration,
int alive_count,
int not_alive_count) -> void
{
sub_liveliness_recovered(guid, kind, lease_duration);
sub_liveliness_changed(
guid,
kind,
lease_duration,
alive_count,
not_alive_count);
},
mp_participant->getEventResource().getIOService(),
mp_participant->getEventResource().getThread());
Expand Down Expand Up @@ -813,12 +822,21 @@ bool WLP::assert_liveliness(
lease_duration);
}

void WLP::pub_liveliness_lost(
void WLP::pub_liveliness_changed(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration)
const Duration_t& lease_duration,
int32_t alive_change,
int32_t not_alive_change)
{
(void)lease_duration;
(void)alive_change;

// On the publishing side we only have to notify if one of our writers loses liveliness
if (not_alive_change != 1)
{
return;
}

if (kind == AUTOMATIC_LIVELINESS_QOS)
{
Expand Down Expand Up @@ -882,34 +900,12 @@ void WLP::pub_liveliness_lost(
}
}

void WLP::sub_liveliness_lost(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration)
{
// Writer with given guid lost liveliness, check which readers were matched and inform them

RemoteWriterAttributes ratt;
ratt.guid = writer;

for (RTPSReader* reader : readers_)
{
if (reader->liveliness_kind_ == kind &&
reader->liveliness_lease_duration_ == lease_duration)
{
if (reader->matched_writer_is_matched(ratt))
{
update_liveliness_changed_status(writer, reader, true);
}
}
}
}

void WLP::sub_liveliness_recovered(
void WLP::sub_liveliness_changed(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration)

const Duration_t& lease_duration,
int32_t alive_change,
int32_t not_alive_change)
{
// Writer with given guid lost liveliness, check which readers were matched and inform them

Expand All @@ -923,7 +919,11 @@ void WLP::sub_liveliness_recovered(
{
if (reader->matched_writer_is_matched(ratt))
{
update_liveliness_changed_status(writer, reader, false);
update_liveliness_changed_status(
writer,
reader,
alive_change,
not_alive_change);
}
}
}
Expand All @@ -932,23 +932,22 @@ void WLP::sub_liveliness_recovered(
void WLP::update_liveliness_changed_status(
GUID_t writer,
RTPSReader* reader,
bool lost)
int32_t alive_change,
int32_t not_alive_change)
{
int change = lost ? -1 : 1;

reader->liveliness_changed_status_.alive_count += change;
reader->liveliness_changed_status_.alive_count_change += change;
reader->liveliness_changed_status_.not_alive_count -= change;
reader->liveliness_changed_status_.not_alive_count_change -= change;
reader->liveliness_changed_status_.alive_count += alive_change;
reader->liveliness_changed_status_.alive_count_change += alive_change;
reader->liveliness_changed_status_.not_alive_count += not_alive_change;
reader->liveliness_changed_status_.not_alive_count_change += not_alive_change;
reader->liveliness_changed_status_.last_publication_handle = writer;

if (reader->getListener() != nullptr)
{
reader->getListener()->on_liveliness_changed(reader, reader->liveliness_changed_status_);
}

reader->liveliness_changed_status_.alive_count_change = 0;
reader->liveliness_changed_status_.not_alive_count_change = 0;
reader->liveliness_changed_status_.alive_count_change = 0;
reader->liveliness_changed_status_.not_alive_count_change = 0;
}
}

} /* namespace rtps */
Expand Down
Loading

0 comments on commit 17fee82

Please sign in to comment.