Skip to content

Commit

Permalink
Fix for late-joiners KEEP_LAST, RELIABLE, TRANSIENT_LOCAL (#1320)
Browse files Browse the repository at this point in the history
* Fix for late-joiners KEEP_LAST, RELIABLE, TRANSIENT_LOCAL (#1314)

* Refs #8896 Add Blackbox Tests

* Large data volatile tests
* Large data transient local tests
* Tests to check #8896 bug (Transient Local, Reliable, Keep Last 1)

Signed-off-by: Laura Martin <laura@eprosima.com>

* Refs #8896 Fix history full check in SubscriberHistory

Signed-off-by: Laura Martin <laura@eprosima.com>

* Refs #8986 Requested Changes

Signed-off-by: Laura Martin <laura@eprosima.com>

* Refs #8986 Separate HistoryAttributes calculation to a separate method

Signed-off-by: Laura Martin <laura@eprosima.com>

* Uncrustify

Signed-off-by: Laura Martin <laura@eprosima.com>
  • Loading branch information
lauramg15 authored Aug 4, 2020
1 parent 40568fa commit 53a1e7d
Show file tree
Hide file tree
Showing 6 changed files with 556 additions and 160 deletions.
53 changes: 32 additions & 21 deletions src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,35 @@ static void get_sample_info(
info->related_sample_identity = change->write_params.sample_identity();
}

static HistoryAttributes to_history_attributes(
const TopicAttributes& topic_att,
uint32_t payloadMaxSize,
MemoryManagementPolicy_t mempolicy)
{
auto initial_samples = topic_att.resourceLimitsQos.allocated_samples;
auto max_samples = topic_att.resourceLimitsQos.max_samples;

if (topic_att.historyQos.kind != KEEP_ALL_HISTORY_QOS)
{
max_samples = topic_att.historyQos.depth;
if (topic_att.getTopicKind() != NO_KEY)
{
max_samples *= topic_att.resourceLimitsQos.max_instances;
}

initial_samples = std::min(initial_samples, max_samples);
}

return HistoryAttributes(mempolicy, payloadMaxSize, initial_samples, max_samples);
}

SubscriberHistory::SubscriberHistory(
const TopicAttributes& topic_att,
TopicDataType* type,
const ReaderQos& qos,
uint32_t payloadMaxSize,
MemoryManagementPolicy_t mempolicy)
: ReaderHistory(HistoryAttributes(mempolicy, payloadMaxSize,
topic_att.historyQos.kind == KEEP_ALL_HISTORY_QOS ?
topic_att.resourceLimitsQos.allocated_samples :
topic_att.getTopicKind() == NO_KEY ?
std::min(topic_att.resourceLimitsQos.allocated_samples, topic_att.historyQos.depth) :
std::min(topic_att.resourceLimitsQos.allocated_samples, topic_att.historyQos.depth
* topic_att.resourceLimitsQos.max_instances),
topic_att.historyQos.kind == KEEP_ALL_HISTORY_QOS ?
topic_att.resourceLimitsQos.max_samples :
topic_att.getTopicKind() == NO_KEY ?
topic_att.historyQos.depth :
topic_att.historyQos.depth * topic_att.resourceLimitsQos.max_instances))
: ReaderHistory(to_history_attributes(topic_att, payloadMaxSize, mempolicy))
, history_qos_(topic_att.historyQos)
, resource_limited_qos_(topic_att.resourceLimitsQos)
, topic_att_(topic_att)
Expand Down Expand Up @@ -220,7 +231,7 @@ bool SubscriberHistory::add_received_change(

if (add_change(a_change))
{
if (m_changes.size() == static_cast<size_t>(resource_limited_qos_.max_samples) )
if (m_changes.size() == static_cast<size_t>(m_att.maximumReservedCaches))
{
m_isHistoryFull = true;
}
Expand Down Expand Up @@ -248,7 +259,7 @@ bool SubscriberHistory::add_received_change_with_key(

if (add_change(a_change))
{
if (m_changes.size() == static_cast<size_t>(resource_limited_qos_.max_samples))
if (m_changes.size() == static_cast<size_t>(m_att.maximumReservedCaches))
{
m_isHistoryFull = true;
}
Expand Down Expand Up @@ -280,7 +291,7 @@ bool SubscriberHistory::find_key_for_change(
bool is_key_protected = false;
#if HAVE_SECURITY
is_key_protected = mp_reader->getAttributes().security_attributes().is_key_protected;
#endif
#endif // if HAVE_SECURITY
if (!type_->getKey(get_key_object_, &a_change->instanceHandle, is_key_protected))
{
return false;
Expand Down Expand Up @@ -314,13 +325,13 @@ bool SubscriberHistory::deserialize_change(
if (info != nullptr)
{
if (topic_att_.topicKind == WITH_KEY &&
change->instanceHandle == c_InstanceHandle_Unknown &&
change->kind == ALIVE)
change->instanceHandle == c_InstanceHandle_Unknown &&
change->kind == ALIVE)
{
bool is_key_protected = false;
#if HAVE_SECURITY
is_key_protected = mp_reader->getAttributes().security_attributes().is_key_protected;
#endif
#endif // if HAVE_SECURITY
type_->getKey(data, &change->instanceHandle, is_key_protected);
}

Expand Down Expand Up @@ -534,9 +545,9 @@ bool SubscriberHistory::get_next_deadline(
[](
const std::pair<InstanceHandle_t, KeyedChanges>& lhs,
const std::pair<InstanceHandle_t, KeyedChanges>& rhs)
{
return lhs.second.next_deadline_us < rhs.second.next_deadline_us;
});
{
return lhs.second.next_deadline_us < rhs.second.next_deadline_us;
});
handle = min->first;
next_deadline_us = min->second.next_deadline_us;
return true;
Expand Down
Loading

0 comments on commit 53a1e7d

Please sign in to comment.