Skip to content

Commit

Permalink
[core] registration-provider-refactoring (#1677)
Browse files Browse the repository at this point in the history
* registration provider single threaded sending*
* Register(false) calls removed finally
* registration provider logic reduced to just send registration samples (connection to descgate, monitoring cut)
  • Loading branch information
rex-schilasky authored Jul 29, 2024
1 parent 9110d2a commit 0b60453
Show file tree
Hide file tree
Showing 23 changed files with 261 additions and 222 deletions.
34 changes: 17 additions & 17 deletions ecal/core/cfg/ecal.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# _____ _ _ ____ _ _
# | ____|___| (_)_ __ ___ ___ ___ / ___| / \ | |
# | _| / __| | | '_ \/ __|/ _ \ _____ / _ \ | / _ \ | |
# | |__| (__| | | |_) \__ \ __/ |_____| | __/ |___ / ___ \| |___
# |_____\___|_|_| .__/|___/\___| \___|\____/_/ \_\_____|
# _____ _ _ ____ _ _
# | ____|___| (_)_ __ ___ ___ ___ / ___| / \ | |
# | _| / __| | | '_ \/ __|/ _ \ _____ / _ \ | / _ \ | |
# | |__| (__| | | |_) \__ \ __/ |_____| | __/ |___ / ___ \| |___
# |_____\___|_|_| .__/|___/\___| \___|\____/_/ \_\_____|
# |_|
# _ _ _ __ _ _ _
# __ _| | ___ | |__ __ _| | ___ ___ _ __ / _(_) __ _ _ _ _ __ __ _| |_(_) ___ _ __
Expand All @@ -15,7 +15,7 @@
# Registration layer configuration
registration:
# Topic registration refresh cylce (has to be smaller then registration timeout! Default: 1000)
registration_refresh: 1000
registration_refresh: 1000
# Timeout for topic registration in ms (internal, Default: 60000)
registration_timeout: 60000
# Enable to receive registration information on the same local machine
Expand All @@ -38,12 +38,12 @@ registration:
udp:
enable: true
port: 14000


# Monitoring configuration
monitoring:
# Timeout for topic monitoring in ms (Default: 1000), increase in 1000er steps
timeout: 1000
# Timeout for topic monitoring in ms (Default: 5000), increase in 1000er steps
timeout: 5000
# Topics blacklist as regular expression (will not be monitored)
filter_excl: "^__.*$"
# Topics whitelist as regular expression (will be monitored only) (Default: "")
Expand Down Expand Up @@ -74,7 +74,7 @@ transport_layer:
join_all_interfaces: false
# Windows specific setting to enable receiving UDP traffic with the Npcap based receiver
npcap_enabled: false

# In local mode multicast group and ttl are set by default and are not adjustable
local:
# Multicast group base. All registration and logging is sent on this address
Expand All @@ -86,22 +86,22 @@ transport_layer:
# Multicast group base. All registration and logging is sent on this address
group: "239.0.0.1"
# TTL (hop limit) is used to determine the amount of routers being traversed towards the destination
ttl: 3
ttl: 3

tcp:
tcp:
# Reader amount of threads that shall execute workload
number_executor_reader: 4
# Writer amount of threads that shall execute workload
number_executor_writer: 4
# Reconnection attemps the session will try to reconnect in case of an issue
max_reconnections: 5

shm:
shm:
# Default memory file size for new publisher
memfile_min_size_bytes: 4096
# Dynamic file size reserve before recreating memory file if topic size changes
memfile_reserve_percent: 50


# Publisher specific base settings
publisher:
Expand All @@ -117,12 +117,12 @@ publisher:
acknowledge_timeout_ms: 0
# Maximum number of used buffers (needs to be greater than 1, default = 1)
memfile_buffer_count: 1

# Base configuration for UDP publisher
udp:
# Enable layer
enable: true

# Base configuration for TCP publisher
tcp:
# Enable layer
Expand Down Expand Up @@ -157,7 +157,7 @@ subscriber:

# Enable dropping of payload messages that arrive out of order
drop_out_of_order_messages: true


# Time configuration
time:
Expand Down
24 changes: 12 additions & 12 deletions ecal/core/src/config/default_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const std::string default_config = R"(# _____ _ _
# Registration layer configuration
registration:
# Topic registration refresh cylce (has to be smaller then registration timeout! Default: 1000)
registration_refresh: 1000
registration_refresh: 1000
# Timeout for topic registration in ms (internal, Default: 60000)
registration_timeout: 60000
# Enable to receive registration information on the same local machine
Expand All @@ -42,12 +42,12 @@ const std::string default_config = R"(# _____ _ _
udp:
enable: true
port: 14000
# Monitoring configuration
monitoring:
# Timeout for topic monitoring in ms (Default: 1000), increase in 1000er steps
timeout: 1000
# Timeout for topic monitoring in ms (Default: 5000), increase in 1000er steps
timeout: 5000
# Topics blacklist as regular expression (will not be monitored)
filter_excl: "^__.*$"
# Topics whitelist as regular expression (will be monitored only) (Default: "")
Expand Down Expand Up @@ -78,7 +78,7 @@ const std::string default_config = R"(# _____ _ _
join_all_interfaces: false
# Windows specific setting to enable receiving UDP traffic with the Npcap based receiver
npcap_enabled: false
# In local mode multicast group and ttl are set by default and are not adjustable
local:
# Multicast group base. All registration and logging is sent on this address
Expand All @@ -90,22 +90,22 @@ const std::string default_config = R"(# _____ _ _
# Multicast group base. All registration and logging is sent on this address
group: "239.0.0.1"
# TTL (hop limit) is used to determine the amount of routers being traversed towards the destination
ttl: 3
ttl: 3
tcp:
tcp:
# Reader amount of threads that shall execute workload
number_executor_reader: 4
# Writer amount of threads that shall execute workload
number_executor_writer: 4
# Reconnection attemps the session will try to reconnect in case of an issue
max_reconnections: 5
shm:
shm:
# Default memory file size for new publisher
memfile_min_size_bytes: 4096
# Dynamic file size reserve before recreating memory file if topic size changes
memfile_reserve_percent: 50
# Publisher specific base settings
publisher:
Expand All @@ -121,12 +121,12 @@ const std::string default_config = R"(# _____ _ _
acknowledge_timeout_ms: 0
# Maximum number of used buffers (needs to be greater than 1, default = 1)
memfile_buffer_count: 1
# Base configuration for UDP publisher
udp:
# Enable layer
enable: true
# Base configuration for TCP publisher
tcp:
# Enable layer
Expand Down Expand Up @@ -161,7 +161,7 @@ const std::string default_config = R"(# _____ _ _
# Enable dropping of payload messages that arrive out of order
drop_out_of_order_messages: true
# Time configuration
time:
Expand Down
6 changes: 2 additions & 4 deletions ecal/core/src/ecal_globals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,7 @@ namespace eCAL
if (descgate_instance)
{
#if ECAL_CORE_REGISTRATION
// utilize registration provider and receiver to get descriptions
g_registration_provider()->SetCustomApplySampleCallback("descgate", [](const auto& sample_) {g_descgate()->ApplySample(sample_, tl_none); });
// utilize registration receiver to get descriptions
g_registration_receiver()->SetCustomApplySampleCallback("descgate", [](const auto& sample_) {g_descgate()->ApplySample(sample_, tl_none); });
#endif
}
Expand Down Expand Up @@ -303,8 +302,7 @@ namespace eCAL
if (descgate_instance)
{
#if ECAL_CORE_REGISTRATION
// stop registration provider and receiver utilization to get descriptions
g_registration_provider()->RemCustomApplySampleCallback("descgate");
// stop registration receiver utilization to get descriptions
g_registration_receiver()->RemCustomApplySampleCallback("descgate");
#endif
}
Expand Down
6 changes: 2 additions & 4 deletions ecal/core/src/monitoring/ecal_monitoring_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ namespace eCAL
// get name of this host
m_host_name = Process::GetHostName();

// utilize registration provider and receiver to enrich monitor information
g_registration_provider()->SetCustomApplySampleCallback("monitoring", [this](const auto& sample_) {this->ApplySample(sample_, tl_none); });
// utilize registration receiver to enrich monitor information
g_registration_receiver()->SetCustomApplySampleCallback("monitoring", [this](const auto& sample_){this->ApplySample(sample_, tl_none);});

// setup blacklist and whitelist filter strings#
Expand All @@ -74,8 +73,7 @@ namespace eCAL

void CMonitoringImpl::Destroy()
{
// stop registration provider and receiver utilization to enrich monitor information
g_registration_provider()->RemCustomApplySampleCallback("monitoring");
// stop registration receiver utilization to enrich monitor information
g_registration_receiver()->RemCustomApplySampleCallback("monitoring");
m_init = false;
}
Expand Down
4 changes: 0 additions & 4 deletions ecal/core/src/pubsub/ecal_subgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ namespace eCAL
void CSubGate::Start()
{
if(m_created) return;

// initialize data reader layers
CDataReader::InitializeLayers();

m_created = true;
}

Expand Down
27 changes: 7 additions & 20 deletions ecal/core/src/readwrite/ecal_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,11 @@ namespace eCAL
m_pub_map.set_expiration(registration_timeout);

// start transport layers
InitializeLayers();
StartTransportLayer();

// mark as created
m_created = true;

// register
Register(false);
}

CDataReader::~CDataReader()
Expand Down Expand Up @@ -256,36 +254,25 @@ namespace eCAL

bool CDataReader::SetAttribute(const std::string& attr_name_, const std::string& attr_value_)
{
auto current_val = m_attr.find(attr_name_);

const bool force = current_val == m_attr.end() || current_val->second != attr_value_;
m_attr[attr_name_] = attr_value_;

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::SetAttribute");
#endif

// register it
Register(force);

return(true);
}

bool CDataReader::ClearAttribute(const std::string& attr_name_)
{
auto force = m_attr.find(attr_name_) != m_attr.end();

m_attr.erase(attr_name_);

#ifndef NDEBUG
// log it
Logging::Log(log_level_debug2, m_topic_name + "::CDataReader::ClearAttribute");
#endif

// register it
Register(force);

return(true);
}

Expand Down Expand Up @@ -355,23 +342,23 @@ namespace eCAL
{
// initialize udp layer
#if ECAL_CORE_TRANSPORT_UDP
if (Config::IsUdpMulticastRecEnabled())
if (m_config.udp.enable)
{
CUDPReaderLayer::Get()->Initialize();
}
#endif

// initialize shm layer
#if ECAL_CORE_TRANSPORT_SHM
if (Config::IsShmRecEnabled())
if (m_config.shm.enable)
{
CSHMReaderLayer::Get()->Initialize();
}
#endif

// initialize tcp layer
#if ECAL_CORE_TRANSPORT_TCP
if (Config::IsTcpRecEnabled())
if (m_config.tcp.enable)
{
CTCPReaderLayer::Get()->Initialize();
}
Expand Down Expand Up @@ -533,10 +520,10 @@ namespace eCAL
return(out.str());
}

void CDataReader::Register(bool force_)
void CDataReader::Register()
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetRegistrationSample(), force_);
if (g_registration_provider() != nullptr) g_registration_provider()->RegisterSample(GetRegistrationSample());

#ifndef NDEBUG
// log it
Expand All @@ -548,7 +535,7 @@ namespace eCAL
void CDataReader::Unregister()
{
#if ECAL_CORE_REGISTRATION
if (g_registration_provider() != nullptr) g_registration_provider()->ApplySample(GetUnregistrationSample(), false);
if (g_registration_provider() != nullptr) g_registration_provider()->UnregisterSample(GetUnregistrationSample());

#ifndef NDEBUG
// log it
Expand Down
4 changes: 2 additions & 2 deletions ecal/core/src/readwrite/ecal_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ namespace eCAL
std::string GetTopicID() const { return(m_topic_id); }
SDataTypeInformation GetDataTypeInformation() const { return(m_topic_info); }

static void InitializeLayers();
void InitializeLayers();
size_t ApplySample(const std::string& tid_, const char* payload_, size_t size_, long long id_, long long clock_, long long time_, size_t hash_, eTLayerType layer_);

std::string Dump(const std::string& indent_ = "");

protected:
void Register(bool force_);
void Register();
void Unregister();

void CheckConnections();
Expand Down
Loading

0 comments on commit 0b60453

Please sign in to comment.