Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Reduce allocation / deallocations necessary for Registration. #1744

Merged
merged 10 commits into from
Sep 26, 2024
1 change: 1 addition & 0 deletions ecal/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ endif()
set(ecal_util_src
src/util/ecal_expmap.h
src/util/ecal_thread.h
src/util/expanding_vector.h
src/util/frequency_calculator.h
src/util/getenvvar.h
)
Expand Down
7 changes: 7 additions & 0 deletions ecal/core/include/ecal/ecal_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ namespace eCAL
{
return std::tie(name, encoding, descriptor) < std::tie(rhs.name, rhs.encoding, rhs.descriptor);
}

void clear()
{
name.clear();
encoding.clear();
descriptor.clear();
}
//!< @endcond
};

Expand Down
2 changes: 1 addition & 1 deletion ecal/core/src/pubsub/ecal_pubgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ namespace eCAL
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datawriter_sync);
for (const auto& iter : m_topic_name_datawriter_map)
{
reg_sample_list_.samples.emplace_back(iter.second->GetRegistration());
iter.second->GetRegistration(reg_sample_list_.push_back());
}
}
}
2 changes: 1 addition & 1 deletion ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ namespace eCAL
const std::shared_lock<std::shared_timed_mutex> lock(m_topic_name_datareader_sync);
for (const auto& iter : m_topic_name_datareader_map)
{
reg_sample_list_.samples.emplace_back(iter.second->GetRegistration());
iter.second->GetRegistration(reg_sample_list_.push_back());
}
}
}
24 changes: 10 additions & 14 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,9 @@ namespace eCAL
void CDataReader::Register()
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(GetRegistrationSample());
Registration::Sample sample;
GetRegistrationSample(sample);
if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(sample);

#ifndef NDEBUG
// log it
Expand All @@ -606,7 +608,9 @@ namespace eCAL
void CDataReader::Unregister()
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(GetUnregistrationSample());
Registration::Sample sample;
GetUnregistrationSample(sample);
if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(sample);

#ifndef NDEBUG
// log it
Expand All @@ -615,10 +619,10 @@ namespace eCAL
#endif // ECAL_CORE_REGISTRATION
}

Registration::Sample CDataReader::GetRegistration()
void CDataReader::GetRegistration(Registration::Sample& sample)
{
// return registration
return GetRegistrationSample();
return GetRegistrationSample(sample);
}

bool CDataReader::IsPublished() const
Expand All @@ -631,10 +635,8 @@ namespace eCAL
return m_connection_count;
}

Registration::Sample CDataReader::GetRegistrationSample()
void CDataReader::GetRegistrationSample(Registration::Sample& ecal_reg_sample)
{
// create registration sample
Registration::Sample ecal_reg_sample;
ecal_reg_sample.cmd_type = bct_reg_subscriber;

auto& ecal_reg_sample_identifier = ecal_reg_sample.identifier;
Expand Down Expand Up @@ -706,14 +708,10 @@ namespace eCAL
// we do not know the number of connections ..
ecal_reg_sample_topic.connections_loc = 0;
ecal_reg_sample_topic.connections_ext = 0;

return ecal_reg_sample;
}

Registration::Sample CDataReader::GetUnregistrationSample()
void CDataReader::GetUnregistrationSample(Registration::Sample& ecal_unreg_sample)
{
// create unregistration sample
Registration::Sample ecal_unreg_sample;
ecal_unreg_sample.cmd_type = bct_unreg_subscriber;

auto& ecal_reg_sample_identifier = ecal_unreg_sample.identifier;
Expand All @@ -726,8 +724,6 @@ namespace eCAL
ecal_reg_sample_topic.pname = m_pname;
ecal_reg_sample_topic.tname = m_topic_name;
ecal_reg_sample_topic.uname = Process::GetUnitName();

return ecal_unreg_sample;
}

void CDataReader::StartTransportLayer()
Expand Down
6 changes: 3 additions & 3 deletions ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ namespace eCAL

void ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_);

Registration::Sample GetRegistration();
void GetRegistration(Registration::Sample& sample);
bool IsCreated() const { return(m_created); }

bool IsPublished() const;
Expand Down Expand Up @@ -115,8 +115,8 @@ namespace eCAL
void Register();
void Unregister();

Registration::Sample GetRegistrationSample();
Registration::Sample GetUnregistrationSample();
void GetRegistrationSample(Registration::Sample& sample);
void GetUnregistrationSample(Registration::Sample& sample);

void StartTransportLayer();
void StopTransportLayer();
Expand Down
24 changes: 10 additions & 14 deletions ecal/core/src/readwrite/ecal_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,9 @@ namespace eCAL
void CDataWriter::Register()
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(GetRegistrationSample());
Registration::Sample registration_sample;
GetRegistrationSample(registration_sample);
if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(registration_sample);

#ifndef NDEBUG
// log it
Expand All @@ -640,7 +642,9 @@ namespace eCAL
void CDataWriter::Unregister()
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(GetUnregistrationSample());
Registration::Sample unregistration_sample;
GetUnregistrationSample(unregistration_sample);
if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(unregistration_sample);

#ifndef NDEBUG
// log it
Expand All @@ -649,15 +653,13 @@ namespace eCAL
#endif // ECAL_CORE_REGISTRATION
}

Registration::Sample CDataWriter::GetRegistration()
void CDataWriter::GetRegistration(Registration::Sample& sample)
{
return GetRegistrationSample();
GetRegistrationSample(sample);
}

Registration::Sample CDataWriter::GetRegistrationSample()
void CDataWriter::GetRegistrationSample(Registration::Sample& ecal_reg_sample)
{
// create registration sample
Registration::Sample ecal_reg_sample;
ecal_reg_sample.cmd_type = bct_reg_publisher;

auto& ecal_reg_sample_identifier = ecal_reg_sample.identifier;
Expand Down Expand Up @@ -748,14 +750,10 @@ namespace eCAL
}
ecal_reg_sample_topic.connections_loc = static_cast<int32_t>(loc_connections);
ecal_reg_sample_topic.connections_ext = static_cast<int32_t>(ext_connections);

return ecal_reg_sample;
}

Registration::Sample CDataWriter::GetUnregistrationSample()
void CDataWriter::GetUnregistrationSample(Registration::Sample& ecal_unreg_sample)
{
// create unregistration sample
Registration::Sample ecal_unreg_sample;
ecal_unreg_sample.cmd_type = bct_unreg_publisher;

auto& ecal_reg_sample_identifier = ecal_unreg_sample.identifier;
Expand All @@ -768,8 +766,6 @@ namespace eCAL
ecal_reg_sample_topic.pname = m_pname;
ecal_reg_sample_topic.tname = m_topic_name;
ecal_reg_sample_topic.uname = Process::GetUnitName();

return ecal_unreg_sample;
}

void CDataWriter::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_)
Expand Down
6 changes: 3 additions & 3 deletions ecal/core/src/readwrite/ecal_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ namespace eCAL
void ApplySubscription(const SSubscriptionInfo& subscription_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& sub_layer_states_, const std::string& reader_par_);
void RemoveSubscription(const SSubscriptionInfo& subscription_info_);

Registration::Sample GetRegistration();
void GetRegistration(Registration::Sample& sample);
void RefreshSendCounter();

bool IsCreated() const { return(m_created); }
Expand All @@ -119,8 +119,8 @@ namespace eCAL
void Register();
void Unregister();

Registration::Sample GetRegistrationSample();
Registration::Sample GetUnregistrationSample();
void GetRegistrationSample(Registration::Sample& sample);
void GetUnregistrationSample(Registration::Sample& sample);

bool StartUdpLayer();
bool StartShmLayer();
Expand Down
22 changes: 12 additions & 10 deletions ecal/core/src/registration/ecal_registration_provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <atomic>
#include <chrono>
#include <functional>
#include <iterator>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -136,46 +137,47 @@ namespace eCAL
void CRegistrationProvider::AddSingleSample(const Registration::Sample& sample_)
{
const std::lock_guard<std::mutex> lock(m_applied_sample_list_mtx);
m_applied_sample_list.samples.push_back(sample_);
m_applied_sample_list.push_back(sample_);
}

void CRegistrationProvider::RegisterSendThread()
{
// collect all registrations and send them out cyclic
{
// create sample list
Registration::SampleList sample_list;
m_send_thread_sample_list.clear();

// and add process registration sample
sample_list.samples.push_back(Registration::GetProcessRegisterSample());
m_send_thread_sample_list.push_back(Registration::GetProcessRegisterSample());

#if ECAL_CORE_SUBSCRIBER
// add subscriber registrations
if (g_subgate() != nullptr) g_subgate()->GetRegistrations(sample_list);
if (g_subgate() != nullptr) g_subgate()->GetRegistrations(m_send_thread_sample_list);
#endif

#if ECAL_CORE_PUBLISHER
// add publisher registrations
if (g_pubgate() != nullptr) g_pubgate()->GetRegistrations(sample_list);
if (g_pubgate() != nullptr) g_pubgate()->GetRegistrations(m_send_thread_sample_list);
#endif

#if ECAL_CORE_SERVICE
// add server registrations
if (g_servicegate() != nullptr) g_servicegate()->GetRegistrations(sample_list);
if (g_servicegate() != nullptr) g_servicegate()->GetRegistrations(m_send_thread_sample_list);

// add client registrations
if (g_clientgate() != nullptr) g_clientgate()->GetRegistrations(sample_list);
if (g_clientgate() != nullptr) g_clientgate()->GetRegistrations(m_send_thread_sample_list);
#endif

// append applied samples list to sample list
if (!m_applied_sample_list.samples.empty())
if (!m_applied_sample_list.empty())
{
const std::lock_guard<std::mutex> lock(m_applied_sample_list_mtx);
sample_list.samples.splice(sample_list.samples.end(), m_applied_sample_list.samples);
std::copy(m_applied_sample_list.begin(), m_applied_sample_list.end(), std::back_inserter(m_send_thread_sample_list));
m_applied_sample_list.clear();
}

// send collected registration sample list
m_reg_sender->SendSampleList(sample_list);
m_reg_sender->SendSampleList(m_send_thread_sample_list);
}
}
}
2 changes: 2 additions & 0 deletions ecal/core/src/registration/ecal_registration_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ namespace eCAL
std::mutex m_applied_sample_list_mtx;
Registration::SampleList m_applied_sample_list;

Registration::SampleList m_send_thread_sample_list;

Registration::SAttributes m_attributes;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,17 @@ namespace eCAL

void CRegistrationReceiverSHM::Receive()
{
// At the moment this function is called synchronously by a dedicated thread.
// If this changes, we need to protect the sample list member variable
MemfileBroadcastMessageListT message_list;
if (m_memfile_broadcast_reader->Read(message_list, 0))
{
eCAL::Registration::SampleList sample_list;
m_sample_list.clear();
for (const auto& message : message_list)
{
if (DeserializeFromBuffer(static_cast<const char*>(message.data), message.size, sample_list))
if (DeserializeFromBuffer(static_cast<const char*>(message.data), message.size, m_sample_list))
{
for (const auto& sample : sample_list.samples)
for (const auto& sample : m_sample_list)
{
m_apply_sample_callback(sample);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ namespace eCAL
std::unique_ptr<CMemoryFileBroadcastReader> m_memfile_broadcast_reader;
std::unique_ptr<CCallbackThread> m_memfile_broadcast_reader_thread;

eCAL::Registration::SampleList m_sample_list;

RegistrationApplySampleCallbackT m_apply_sample_callback;
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace eCAL
bool CRegistrationSenderUDP::SendSampleList(const Registration::SampleList& sample_list)
{
bool return_value{ true };
for (const auto& sample : sample_list.samples)
for (const auto& sample : sample_list)
{
return_value &= SendSample(sample);
}
Expand Down
Loading
Loading