Skip to content

Commit

Permalink
Refs #15841: Return to monitor creation and add error handle
Browse files Browse the repository at this point in the history
Signed-off-by: jparisu <javierparis@eprosima.com>
  • Loading branch information
jparisu committed Oct 18, 2022
1 parent 86b6207 commit aacf5cc
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 95 deletions.
70 changes: 13 additions & 57 deletions src/cpp/Monitor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,24 @@
#include <map>
#include <string>

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/DomainParticipantListener.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantQos.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/subscriber/DataReaderListener.hpp>
#include <fastdds/dds/subscriber/qos/DataReaderQos.hpp>
#include <fastdds/dds/subscriber/qos/SubscriberQos.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
#include <fastdds/dds/topic/qos/TopicQos.hpp>
#include <fastdds/dds/topic/Topic.hpp>

#include <fastdds_statistics_backend/listener/DomainListener.hpp>
#include <fastdds_statistics_backend/listener/CallbackMask.hpp>
#include <fastdds_statistics_backend/types/EntityId.hpp>

namespace eprosima {
namespace fastdds {
namespace dds {

class DomainParticipant;
class DomainParticipantListener;
class Subscriber;
class Topic;
class DataReader;
class DataReaderListener;

} // namespace dds
} // namespace fastdds

namespace statistics_backend {
namespace details {

Expand All @@ -49,50 +50,6 @@ namespace details {
*/
struct Monitor
{
/**
* @brief Destroy the Monitor object
*
* Destroy every pointer that has been set.
* This method works even if the monitor creation has failed
*
* @warning this may not be the best way to implement the destruction of subentities, as they are not created
* under this class. But it is very convenience so it is reused during Monitor creation in case an error occurs
* and also it is used to normally destroy the Monitor.
*/
~Monitor()
{
// These values are not always set, as could come from an error creating Monitor, or for test sake.
if (participant)
{
if (subscriber)
{
for (auto& reader : readers)
{
subscriber->delete_datareader(reader.second);
}

participant->delete_subscriber(subscriber);
}

for (auto& topic : topics)
{
participant->delete_topic(topic.second);
}

fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(participant);
}

if (reader_listener)
{
delete reader_listener;
}

if (participant_listener)
{
delete participant_listener;
}
}

//! The EntityId of the monitored domain
EntityId id{};

Expand All @@ -112,7 +69,6 @@ struct Monitor
//! It will process the entity discoveries
fastdds::dds::DomainParticipantListener* participant_listener = nullptr;


//! The participant created to communicate with the statistics reporting publishers in this monitor
fastdds::dds::Subscriber* subscriber = nullptr;

Expand Down
115 changes: 85 additions & 30 deletions src/cpp/StatisticsBackend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ void find_or_create_topic_and_type(
{
if (topic_desc->get_type_name() != type->getName())
{
details::StatisticsBackendData::get_instance()->unlock();
throw Error(topic_name + " is not using expected type " + type->getName() +
" and is using instead type " + topic_desc->get_type_name());
}
Expand All @@ -107,7 +106,6 @@ void find_or_create_topic_and_type(
catch (const std::bad_cast& e)
{
// TODO[ILG]: Could we support other TopicDescription types in this context?
details::StatisticsBackendData::get_instance()->unlock();
throw Error(topic_name + " is already used but is not a simple Topic: " + e.what());
}

Expand All @@ -117,7 +115,6 @@ void find_or_create_topic_and_type(
if (ReturnCode_t::RETCODE_PRECONDITION_NOT_MET == monitor->participant->register_type(type, type->getName()))
{
// Name already in use
details::StatisticsBackendData::get_instance()->unlock();
throw Error(std::string("Type name ") + type->getName() + " is already in use");
}
monitor->topics[topic_name] =
Expand Down Expand Up @@ -182,17 +179,42 @@ EntityId create_and_register_monitor(
const DomainParticipantQos& participant_qos,
const DomainId domain_id = 0)
{
// NOTE: This method is quite awful to read because of the error handle of every entity
// This could be done much nicer encapsulating this in Monitor creation in destruction, but you know...
// Why do not call stop_monitor in error case?, youll ask. Well, mutexes are treated rarely here, as this static
// class locks and unlocks StatisticsBackendData mutex, what makes very difficult to do some coherent
// calls from one to another.
// What should happen is that all this logic is moved to StatisticsBackendData. You know, some day...

details::StatisticsBackendData::get_instance()->lock();

// Create monitor instance.
// NOTE: register in database at the end, in case any creation fails
std::shared_ptr<details::Monitor> monitor = std::make_shared<details::Monitor>();
std::shared_ptr<database::Domain> domain = std::make_shared<database::Domain>(domain_name);

try
{
domain->id = details::StatisticsBackendData::get_instance()->database_->insert(domain);
}
catch (const std::exception&)
{
details::StatisticsBackendData::get_instance()->unlock();
throw;
}
// TODO: in case this function fails afterwards, the domain will be kept in the database without associated
// Participant. There must exist a way in database to delete a domain, or to make a rollback.

monitor->id = domain->id;
monitor->domain_listener = domain_listener;
monitor->domain_callback_mask = callback_mask;
monitor->data_mask = data_mask;
details::StatisticsBackendData::get_instance()->monitors_by_entity_[domain->id] = monitor;

monitor->participant_listener = new subscriber::StatisticsParticipantListener(
domain->id,
details::StatisticsBackendData::get_instance()->database_.get(),
details::StatisticsBackendData::get_instance()->entity_queue_,
details::StatisticsBackendData::get_instance()->data_queue_);
monitor->reader_listener = new subscriber::StatisticsReaderListener(
details::StatisticsBackendData::get_instance()->data_queue_);

Expand All @@ -207,6 +229,11 @@ EntityId create_and_register_monitor(

if (monitor->participant == nullptr)
{
// Remove those elements that have been set
delete monitor->reader_listener;
delete monitor->participant_listener;
details::StatisticsBackendData::get_instance()->monitors_by_entity_.erase(domain->id);

details::StatisticsBackendData::get_instance()->unlock();
throw Error("Error initializing monitor. Could not create participant");
}
Expand All @@ -219,19 +246,48 @@ EntityId create_and_register_monitor(

if (monitor->subscriber == nullptr)
{
// Remove those elements that have been set
DomainParticipantFactory::get_instance()->delete_participant(monitor->participant);
delete monitor->reader_listener;
delete monitor->participant_listener;
details::StatisticsBackendData::get_instance()->monitors_by_entity_.erase(domain->id);

details::StatisticsBackendData::get_instance()->unlock();
throw Error("Error initializing monitor. Could not create subscriber");
}

for (const auto& topic : topics)
{
/* Register the type and topic*/
register_statistics_type_and_topic(monitor, topic);

if (monitor->topics[topic] == nullptr)
try
{
register_statistics_type_and_topic(monitor, topic);
}
catch (const std::exception& e)
{
// Remove those elements that have been set
for (auto& it : monitor->readers)
{
if (nullptr != it.second)
{
monitor->subscriber->delete_datareader(it.second);
}
}
for (auto& it : monitor->topics)
{
if (nullptr != it.second)
{
monitor->participant->delete_topic(it.second);
}
}
monitor->participant->delete_subscriber(monitor->subscriber);
DomainParticipantFactory::get_instance()->delete_participant(monitor->participant);
delete monitor->reader_listener;
delete monitor->participant_listener;
details::StatisticsBackendData::get_instance()->monitors_by_entity_.erase(domain->id);

details::StatisticsBackendData::get_instance()->unlock();
throw Error("Error initializing monitor. Could not create topic " + std::string(topic));
throw Error("Error registering topic " + std::string(topic) + " : " + e.what());
}

/* Create DataReaders */
Expand All @@ -243,33 +299,32 @@ EntityId create_and_register_monitor(

if (monitor->readers[topic] == nullptr)
{
// Remove those elements that have been set
for (auto& it : monitor->readers)
{
if (nullptr != it.second)
{
monitor->subscriber->delete_datareader(it.second);
}
}
for (auto& it : monitor->topics)
{
if (nullptr != it.second)
{
monitor->participant->delete_topic(it.second);
}
}
monitor->participant->delete_subscriber(monitor->subscriber);
DomainParticipantFactory::get_instance()->delete_participant(monitor->participant);
delete monitor->reader_listener;
delete monitor->participant_listener;
details::StatisticsBackendData::get_instance()->monitors_by_entity_.erase(domain->id);

details::StatisticsBackendData::get_instance()->unlock();
throw Error("Error initializing monitor. Could not create reader for topic " + std::string(topic));
}
}

// Insert domain entity in database
try
{
domain->id = details::StatisticsBackendData::get_instance()->database_->insert(domain);
}
catch (const std::exception&)
{
details::StatisticsBackendData::get_instance()->unlock();
throw;
}

// Insert monitor as a new monitor entity.
// NOTE: Monitor Id is only set after insert domain in database
monitor->id = domain->id;
details::StatisticsBackendData::get_instance()->monitors_by_entity_[domain->id] = monitor;

monitor->participant_listener = new subscriber::StatisticsParticipantListener(
domain->id,
details::StatisticsBackendData::get_instance()->database_.get(),
details::StatisticsBackendData::get_instance()->entity_queue_,
details::StatisticsBackendData::get_instance()->data_queue_);

details::StatisticsBackendData::get_instance()->unlock();
return domain->id;
}
Expand Down
37 changes: 35 additions & 2 deletions src/cpp/StatisticsBackendData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <fastdds/dds/domain/DomainParticipant.hpp>
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
#include <fastdds/dds/domain/qos/DomainParticipantFactoryQos.hpp>
#include <fastdds/dds/domain/DomainParticipantListener.hpp>
#include <fastdds/dds/subscriber/DataReader.hpp>
#include <fastdds/dds/subscriber/Subscriber.hpp>
Expand Down Expand Up @@ -51,7 +52,10 @@ StatisticsBackendData::StatisticsBackendData()
, lock_(mutex_, std::defer_lock)
, participant_factory_instance_(eprosima::fastdds::dds::DomainParticipantFactory::get_shared_instance())
{
// Do nothing
// Set in DomainParticipantFactory that entities are created disabled
eprosima::fastdds::dds::DomainParticipantFactoryQos qos;
participant_factory_instance_->get_qos(qos);
qos.entity_factory().autoenable_created_entities = false;
}

StatisticsBackendData::~StatisticsBackendData()
Expand Down Expand Up @@ -363,7 +367,36 @@ void StatisticsBackendData::stop_monitor(
monitors_by_entity_.erase(it);

// Delete everything created during monitor initialization
monitor.reset();
// These values are not always set, as could come from an error creating Monitor, or for test sake.
if (monitor->participant)
{
if (monitor->subscriber)
{
for (auto& reader : monitor->readers)
{
monitor->subscriber->delete_datareader(reader.second);
}

monitor->participant->delete_subscriber(monitor->subscriber);
}

for (auto& topic : monitor->topics)
{
monitor->participant->delete_topic(topic.second);
}

fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(monitor->participant);
}

if (monitor->reader_listener)
{
delete monitor->reader_listener;
}

if (monitor->participant_listener)
{
delete monitor->participant_listener;
}

// The monitor is inactive
// NOTE: for test sake, this is not always set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ class DomainParticipant
const std::string& type_name
));

MOCK_METHOD0(
enable,
ReturnCode_t
());

DomainParticipantQos qos_;
DomainId_t domain_id_;
eprosima::fastrtps::rtps::GUID_t guid_;
Expand Down
Loading

0 comments on commit aacf5cc

Please sign in to comment.