Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] new-datarw-connection-logic #1729

Merged
merged 5 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,7 @@ namespace eCAL
{
iter->second->ApplyLayerParameter(publication_info, tlayer.type, tlayer.par_layer);
}
// we only inform the subscriber when the publisher has already recognized at least one subscriber
// this should avoid to set the "IsPublished" state before the publisher is able to send data
const bool local_publication = publication_info.host_name == Process::GetHostName();
const bool external_publication = !local_publication;
const bool local_confirmed = local_publication && (ecal_sample_.topic.connections_loc > 0);
const bool external_confirmed = external_publication && (ecal_sample_.topic.connections_ext > 0);
if(local_confirmed || external_confirmed)
{
iter->second->ApplyPublication(publication_info, topic_information, layer_states);
}
iter->second->ApplyPublication(publication_info, topic_information, layer_states);
}
}

Expand Down
195 changes: 125 additions & 70 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ namespace eCAL
m_topic_info(topic_info_),
m_topic_size(0),
m_config(config_),
m_connected(false),
m_receive_time(0),
m_clock(0),
m_frequency_calculator(3.0f),
Expand Down Expand Up @@ -277,35 +276,91 @@ namespace eCAL
m_id_set = id_set_;
}

void CDataReader::ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_)
void CDataReader::ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& pub_layer_states_)
{
// flag write enabled from publisher side (information not used yet)
#if ECAL_CORE_TRANSPORT_UDP
m_layers.udp.write_enabled = layer_states_.udp.write_enabled;
m_layers.udp.write_enabled = pub_layer_states_.udp.write_enabled;
#endif
#if ECAL_CORE_TRANSPORT_SHM
m_layers.shm.write_enabled = layer_states_.shm.write_enabled;
m_layers.shm.write_enabled = pub_layer_states_.shm.write_enabled;
#endif
#if ECAL_CORE_TRANSPORT_TCP
m_layers.tcp.write_enabled = layer_states_.tcp.write_enabled;
m_layers.tcp.write_enabled = pub_layer_states_.tcp.write_enabled;
#endif

FireConnectEvent(publication_info_.entity_id, data_type_info_);
// add key to connection map, including connection state
bool is_new_connection = false;
bool is_updated_connection = false;
{
const std::lock_guard<std::mutex> lock(m_connection_map_mtx);
auto publication_info_iter = m_connection_map.find(publication_info_);

if (publication_info_iter == m_connection_map.end())
{
// add publisher to connection map, connection state false
m_connection_map[publication_info_] = SConnection{ data_type_info_, pub_layer_states_, false };
}
else
{
// existing connection, we got the second update now
auto& connection = publication_info_iter->second;

// add key to publisher map
// if this connection was inactive before
// activate it now and flag a new connection finally
if (!connection.state)
{
is_new_connection = true;
}
// the connection was active, so we just update it
else
{
is_updated_connection = true;
}

// update the data type and layer states, even if the connection is not new
connection = SConnection{ data_type_info_, pub_layer_states_, true };
}
}

// handle these events outside the lock
if (is_new_connection)
{
// fire connect event
FireConnectEvent(publication_info_.entity_id, data_type_info_);
}
else if (is_updated_connection)
{
const std::lock_guard<std::mutex> lock(m_pub_map_mtx);
m_pub_map[publication_info_] = std::make_tuple(data_type_info_, layer_states_);
// fire update event
FireUpdateEvent(publication_info_.entity_id, data_type_info_);
}

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug3, m_topic_name + "::CDataReader::ApplyPublication");
#endif
}

void CDataReader::RemovePublication(const SPublicationInfo& publication_info_)
{
// remove key from publisher map
// remove key from connection map
bool last_connection_gone(false);
{
const std::lock_guard<std::mutex> lock(m_connection_map_mtx);
m_connection_map.erase(publication_info_);
last_connection_gone = m_connection_map.empty();
}

if (last_connection_gone)
{
const std::lock_guard<std::mutex> lock(m_pub_map_mtx);
m_pub_map.erase(publication_info_);
// fire disconnect event
FireDisconnectEvent();
}

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug3, m_topic_name + "::CDataReader::RemovePublication");
#endif
}

void CDataReader::ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_)
Expand Down Expand Up @@ -540,23 +595,37 @@ namespace eCAL
#endif // ECAL_CORE_REGISTRATION
}

void CDataReader::CheckConnections()
Registration::Sample CDataReader::GetRegistration()
{
const std::lock_guard<std::mutex> lock(m_pub_map_mtx);
// return registration
return GetRegistrationSample();
}

if (m_pub_map.empty())
bool CDataReader::IsPublished() const
{
std::lock_guard<std::mutex> const lock(m_connection_map_mtx);
for (const auto& sub : m_connection_map)
{
FireDisconnectEvent();
if (sub.second.state)
{
return true;
}
}
return false;
}

Registration::Sample CDataReader::GetRegistration()
size_t CDataReader::GetPublisherCount() const
{
// check connection timeouts
CheckConnections();

// return registration
return GetRegistrationSample();
std::lock_guard<std::mutex> const lock(m_connection_map_mtx);
size_t count = 0;
for (const auto& sub : m_connection_map)
{
if (sub.second.state)
{
count++;
}
}
return count;
}

Registration::Sample CDataReader::GetRegistrationSample()
Expand Down Expand Up @@ -732,61 +801,47 @@ namespace eCAL

void CDataReader::FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_)
{
SSubEventCallbackData data;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;

if (!m_connected)
{
m_connected = true;

// fire sub_event_connected
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_connected);
if (iter != m_event_callback_map.end() && iter->second)
{
data.type = sub_event_connected;
data.tid = tid_;
data.tdatatype = tinfo_;
(iter->second)(m_topic_name.c_str(), &data);
}
}
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_connected);
if (iter != m_event_callback_map.end() && iter->second)
{
SSubEventCallbackData data;
data.type = sub_event_connected;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
data.tid = tid_;
data.tdatatype = tinfo_;
(iter->second)(m_topic_name.c_str(), &data);
}
}

// fire sub_event_update_connection
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_update_connection);
if (iter != m_event_callback_map.end() && iter->second)
{
data.type = sub_event_update_connection;
data.tid = tid_;
data.tdatatype = tinfo_;
(iter->second)(m_topic_name.c_str(), &data);
}
void CDataReader::FireUpdateEvent(const std::string& tid_, const SDataTypeInformation& tinfo_)
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_update_connection);
if (iter != m_event_callback_map.end() && iter->second)
{
SSubEventCallbackData data;
data.type = sub_event_update_connection;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
data.tid = tid_;
data.tdatatype = tinfo_;
(iter->second)(m_topic_name.c_str(), &data);
}
}

void CDataReader::FireDisconnectEvent()
{
if (m_connected)
{
m_connected = false;

// fire sub_event_disconnected
{
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_disconnected);
if (iter != m_event_callback_map.end() && iter->second)
{
SSubEventCallbackData data;
data.type = sub_event_disconnected;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
(iter->second)(m_topic_name.c_str(), &data);
}
}
const std::lock_guard<std::mutex> lock(m_event_callback_map_mtx);
auto iter = m_event_callback_map.find(sub_event_disconnected);
if (iter != m_event_callback_map.end() && iter->second)
{
SSubEventCallbackData data;
data.type = sub_event_disconnected;
data.time = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
data.clock = 0;
(iter->second)(m_topic_name.c_str(), &data);
}
}

Expand Down
32 changes: 13 additions & 19 deletions ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
#include <queue>
#include <set>
#include <string>
#include <tuple>
#include <unordered_map>

namespace eCAL
Expand Down Expand Up @@ -82,25 +81,16 @@ namespace eCAL

void SetID(const std::set<long long>& id_set_);

void ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& layer_states_);
void ApplyPublication(const SPublicationInfo& publication_info_, const SDataTypeInformation& data_type_info_, const SLayerStates& pub_layer_states_);
void RemovePublication(const SPublicationInfo& publication_info_);

void ApplyLayerParameter(const SPublicationInfo& publication_info_, eTLayerType type_, const Registration::ConnectionPar& parameter_);

Registration::Sample GetRegistration();
bool IsCreated() const { return(m_created); }

bool IsPublished() const
{
std::lock_guard<std::mutex> const lock(m_pub_map_mtx);
return(!m_pub_map.empty());
}

size_t GetPublisherCount() const
{
const std::lock_guard<std::mutex> lock(m_pub_map_mtx);
return(m_pub_map.size());
}
bool IsPublished() const;
size_t GetPublisherCount() const;

std::string GetTopicName() const { return(m_topic_name); }
std::string GetTopicID() const { return(m_topic_id); }
Expand All @@ -115,15 +105,14 @@ namespace eCAL
void Register();
void Unregister();

void CheckConnections();

Registration::Sample GetRegistrationSample();
Registration::Sample GetUnregistrationSample();

void StartTransportLayer();
void StopTransportLayer();

void FireConnectEvent(const std::string& tid_, const SDataTypeInformation& tinfo_);
void FireUpdateEvent(const std::string& tid_, const SDataTypeInformation& tinfo_);
void FireDisconnectEvent();

bool CheckMessageClock(const std::string& tid_, long long current_clock_);
Expand All @@ -141,10 +130,15 @@ namespace eCAL
std::atomic<size_t> m_topic_size;
Subscriber::Configuration m_config;

std::atomic<bool> m_connected;
using PublicationMapT = std::map<SPublicationInfo, std::tuple<SDataTypeInformation, SLayerStates>>;
mutable std::mutex m_pub_map_mtx;
PublicationMapT m_pub_map;
struct SConnection
{
SDataTypeInformation data_type_info;
SLayerStates layer_states;
bool state = false;
};
using PublicationMapT = std::map<SPublicationInfo, SConnection>;
mutable std::mutex m_connection_map_mtx;
PublicationMapT m_connection_map;

mutable std::mutex m_read_buf_mtx;
std::condition_variable m_read_buf_cv;
Expand Down
Loading
Loading