Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix for liveliness changed struct [5765] #578

Merged
merged 5 commits into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions include/fastrtps/qos/LivelinessChangedStatus.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ struct LivelinessChangedStatus
//! or when a publisher previously considered to be not alive reasserts its liveliness. The count decreases
//! when a publisher considered alive fails to assert its liveliness and becomes not alive, whether because
//! it was deleted normally or for some other reason
uint32_t alive_count = 0;
int32_t alive_count = 0;

//! @brief The total count of current publishers that write the topic read by the subscriber that are no longer
//! asserting their liveliness
//! @details This count increases when a publisher considered alive fails to assert its liveliness and becomes
//! not alive for some reason other than the normal deletion of that publisher. It decreases when a previously
//! not alive publisher either reasserts its liveliness or is deleted normally
uint32_t not_alive_count = 0;
int32_t not_alive_count = 0;

//! @brief The change in the alive_count since the last time the listener was called or the status was read
uint32_t alive_count_change = 0;
int32_t alive_count_change = 0;

//! @brief The change in the not_alive_count since the last time the listener was called or the status was read
uint32_t not_alive_count_change = 0;
int32_t not_alive_count_change = 0;

//! @brief Handle to the last publisher whose change in liveliness caused this status to change
rtps::InstanceHandle_t last_publication_handle;
Expand Down
38 changes: 18 additions & 20 deletions include/fastrtps/rtps/builtin/liveliness/WLP.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,49 +205,47 @@ class WLP
LivelinessManager* sub_liveliness_manager_;

/**
* @brief A method invoked by pub_liveliness_manager_ to inform that a writer lost liveliness
* @brief A method invoked by pub_liveliness_manager_ to inform that a writer changed its liveliness
* @param writer The writer losing liveliness
* @param kind The liveliness kind
* @param lease_duration The liveliness lease duration
* @param alive_change The change in the alive count
* @param not_alive_change The change in the not alive count
*/
void pub_liveliness_lost(
void pub_liveliness_changed(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration);
const Duration_t& lease_duration,
int32_t alive_change,
int32_t not_alive_change);

/**
* @brief A method invoked by sub_liveliness_manager_ to inform that a writer lost liveliness
* @brief A method invoked by sub_liveliness_manager_ to inform that a writer changed its liveliness
* @param writer The writer losing liveliness
* @param kind The liveliness kind of the writer losing liveliness
* @param lease_duration The liveliness lease duration of the writer losing liveliness
* @param alive_change The change in the alive count
* @param not_alive_change The change in the not alive count
*/
void sub_liveliness_lost(
void sub_liveliness_changed(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration);

/**
* @brief A method invoked by sub_liveliness_manager_ to inform that a writer recovered liveliness
* @param writer The writer recovering liveliness
* @param kind The liveliness kind of the writer recovering liveliness
* @param lease_duration The liveliness lease duration of the writer recovering liveliness
*/
void sub_liveliness_recovered(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration);

const Duration_t& lease_duration,
int32_t alive_change,
int32_t not_alive_change);

/**
* @brief A method to update the liveliness changed status of a given reader
* @param writer The writer changing liveliness, specified by its guid
* @param reader The reader whose liveliness needs to be updated
* @param lost True to indicate that liveliness of the writer was lost. False to indicate it was recovered
* @param alive_change The change requested for alive count. Should be -1, 0 or +1
* @param not_alive_change The change requested for not alive count. Should be -1, 0 or +1
*/
void update_liveliness_changed_status(
GUID_t writer,
RTPSReader* reader,
bool lost);
int32_t alive_change,
int32_t not_alive_change);

#if HAVE_SECURITY
//!Pointer to the builtinRTPSParticipantMEssageWriter.
Expand Down
16 changes: 14 additions & 2 deletions include/fastrtps/rtps/writer/LivelinessData.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ namespace rtps {
*/
struct LivelinessData
{
enum WriterStatus
{
//! Writer is matched but liveliness has not been asserted yet
NOT_ASSERTED = 0,
//! Writer is alive
ALIVE = 1,
//! Writer is not alive
NOT_ALIVE = 2
};

/**
* @brief Constructor
* @param guid_in GUID of the writer
Expand All @@ -46,12 +56,14 @@ struct LivelinessData
: guid(guid_in)
, kind(kind_in)
, lease_duration(lease_duration_in)
, status(WriterStatus::NOT_ASSERTED)
{}

LivelinessData()
: guid()
, kind(AUTOMATIC_LIVELINESS_QOS)
, lease_duration(c_TimeInfinite)
, status(WriterStatus::NOT_ASSERTED)
{}

~LivelinessData()
Expand Down Expand Up @@ -91,8 +103,8 @@ struct LivelinessData
//! The number of times the writer is being counted
unsigned int count = 1;

//! True if the writer is alive, false otherwise
bool alive = false;
//! The writer status
WriterStatus status;

//! The time when the writer will lose liveliness
std::chrono::steady_clock::time_point time;
Expand Down
37 changes: 16 additions & 21 deletions include/fastrtps/rtps/writer/LivelinessManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ namespace eprosima {
namespace fastrtps {
namespace rtps {

using LivelinessCallback = std::function<void(
const GUID_t&,
const LivelinessQosPolicyKind&,
const Duration_t&,
int32_t alive_change,
int32_t not_alive_change)>;

/**
* @brief A class managing the liveliness of a set of writers. Writers are represented by their LivelinessData
* @details Uses a shared timed event and informs outside classes on liveliness changes
Expand All @@ -39,21 +46,13 @@ class LivelinessManager

/**
* @brief Constructor
* @param liveliness_lost_callback A callback that will be invoked when a writer loses liveliness
* @param liveliness_recovered_callback A callback that will be invoked when a writer recovers liveliness
* @param callback A callback that will be invoked when a writer changes its liveliness status
* @param service The asio I/O service
* @param event_thread The event thread
* @param manage_automatic True to manage writers with automatic liveliness, false otherwise
*/
LivelinessManager(
const std::function<void(
const GUID_t&,
const LivelinessQosPolicyKind&,
const Duration_t&)>& liveliness_lost_callback,
const std::function<void(
const GUID_t&,
const LivelinessQosPolicyKind&,
const Duration_t&)>& liveliness_recovered_callback,
const LivelinessCallback& callback,
asio::io_service& service,
const std::thread& event_thread,
bool manage_automatic = true);
Expand Down Expand Up @@ -128,6 +127,11 @@ class LivelinessManager

private:

//! @brief A method responsible for invoking the callback when liveliness is asserted
//! @param writer The liveliness data of the writer asserting liveliness
//!
void assert_writer_liveliness(LivelinessData& writer);

/**
* @brief A method to calculate the time when the next writer is going to lose liveliness
* @details This method is public for testing purposes but it should not be used from outside this class
Expand All @@ -151,17 +155,8 @@ class LivelinessManager
//! A method called if the timer expires
void timer_expired();

//! A callback to inform outside classes that a writer lost its liveliness
std::function<void(
const GUID_t&,
const LivelinessQosPolicyKind&,
const Duration_t&)> liveliness_lost_callback_;

//! A callback to inform outside classes that a writer recovered its liveliness
std::function<void(
const GUID_t&,
const LivelinessQosPolicyKind&,
const Duration_t&)> liveliness_recovered_callback_;
//! A callback to inform outside classes that a writer changed its liveliness status
LivelinessCallback callback_;

//! A boolean indicating whether we are managing writers with automatic liveliness
bool manage_automatic_;
Expand Down
95 changes: 47 additions & 48 deletions src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,34 @@ bool WLP::initWL(RTPSParticipantImpl* p)
pub_liveliness_manager_ = new LivelinessManager(
[&](const GUID_t& guid,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration) -> void
const Duration_t& lease_duration,
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
int alive_count,
int not_alive_count) -> void
{
pub_liveliness_lost(guid, kind, lease_duration);
pub_liveliness_changed(
guid,
kind,
lease_duration,
alive_count,
not_alive_count);
},
nullptr,
mp_participant->getEventResource().getIOService(),
mp_participant->getEventResource().getThread(),
false);

sub_liveliness_manager_ = new LivelinessManager(
[&](const GUID_t& guid,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration) -> void
{
sub_liveliness_lost(guid, kind, lease_duration);
},
[&](const GUID_t& guid, const LivelinessQosPolicyKind& kind, const Duration_t& lease_duration) -> void
const Duration_t& lease_duration,
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
int alive_count,
int not_alive_count) -> void
{
sub_liveliness_recovered(guid, kind, lease_duration);
sub_liveliness_changed(
guid,
kind,
lease_duration,
alive_count,
not_alive_count);
},
mp_participant->getEventResource().getIOService(),
mp_participant->getEventResource().getThread());
Expand Down Expand Up @@ -813,12 +822,21 @@ bool WLP::assert_liveliness(
lease_duration);
}

void WLP::pub_liveliness_lost(
void WLP::pub_liveliness_changed(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration)
const Duration_t& lease_duration,
int32_t alive_change,
int32_t not_alive_change)
{
(void)lease_duration;
(void)alive_change;

// On the publishing side we only have to notify if one of our writers loses liveliness
if (not_alive_change != 1)
{
return;
}

if (kind == AUTOMATIC_LIVELINESS_QOS)
{
Expand Down Expand Up @@ -882,34 +900,12 @@ void WLP::pub_liveliness_lost(
}
}

void WLP::sub_liveliness_lost(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration)
{
// Writer with given guid lost liveliness, check which readers were matched and inform them

RemoteWriterAttributes ratt;
ratt.guid = writer;

for (RTPSReader* reader : readers_)
{
if (reader->liveliness_kind_ == kind &&
reader->liveliness_lease_duration_ == lease_duration)
{
if (reader->matched_writer_is_matched(ratt))
{
update_liveliness_changed_status(writer, reader, true);
}
}
}
}

void WLP::sub_liveliness_recovered(
void WLP::sub_liveliness_changed(
const GUID_t& writer,
const LivelinessQosPolicyKind& kind,
const Duration_t& lease_duration)

const Duration_t& lease_duration,
int32_t alive_change,
int32_t not_alive_change)
{
// Writer with given guid lost liveliness, check which readers were matched and inform them

Expand All @@ -923,7 +919,11 @@ void WLP::sub_liveliness_recovered(
{
if (reader->matched_writer_is_matched(ratt))
{
update_liveliness_changed_status(writer, reader, false);
update_liveliness_changed_status(
writer,
reader,
alive_change,
not_alive_change);
}
}
}
Expand All @@ -932,23 +932,22 @@ void WLP::sub_liveliness_recovered(
void WLP::update_liveliness_changed_status(
GUID_t writer,
RTPSReader* reader,
bool lost)
int32_t alive_change,
int32_t not_alive_change)
{
int change = lost ? -1 : 1;

reader->liveliness_changed_status_.alive_count += change;
reader->liveliness_changed_status_.alive_count_change += change;
reader->liveliness_changed_status_.not_alive_count -= change;
reader->liveliness_changed_status_.not_alive_count_change -= change;
reader->liveliness_changed_status_.alive_count += alive_change;
reader->liveliness_changed_status_.alive_count_change += alive_change;
reader->liveliness_changed_status_.not_alive_count += not_alive_change;
reader->liveliness_changed_status_.not_alive_count_change += not_alive_change;
reader->liveliness_changed_status_.last_publication_handle = writer;

if (reader->getListener() != nullptr)
{
reader->getListener()->on_liveliness_changed(reader, reader->liveliness_changed_status_);
}

reader->liveliness_changed_status_.alive_count_change = 0;
reader->liveliness_changed_status_.not_alive_count_change = 0;
reader->liveliness_changed_status_.alive_count_change = 0;
reader->liveliness_changed_status_.not_alive_count_change = 0;
}
}

} /* namespace rtps */
Expand Down
Loading