Skip to content

Commit

Permalink
Assign names to application threads
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Nov 15, 2024
1 parent 777c8cd commit 45eca3e
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 23 deletions.
2 changes: 2 additions & 0 deletions xtransmit/generate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "generate.hpp"
#include "pacer.hpp"
#include "metrics.hpp"
#include "xtr_defs.hpp"

// OpenSRT
#include "apputil.hpp"
Expand All @@ -34,6 +35,7 @@ using shared_sock = std::shared_ptr<socket::isocket>;

void run_pipe(shared_sock dst, const config& cfg, std::function<void(int conn_id)> const& on_done, const atomic_bool& force_break)
{
XTR_THREADNAME(std::string("XTR:Gen"));
vector<char> message_to_send(cfg.message_size);
iota(message_to_send.begin(), message_to_send.end(), (char)0);

Expand Down
2 changes: 2 additions & 0 deletions xtransmit/metrics_writer.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <thread>
#include <mutex>
#include "metrics_writer.hpp"
#include "xtr_defs.hpp"

// submodules
#include "spdlog/spdlog.h"
Expand Down Expand Up @@ -149,6 +150,7 @@ future<void> metrics_writer::launch()
}
};

XTR_THREADNAME(std::string("XTR:Metrics"));
return async(
::launch::async, metrics_func, ref(m_validators), ref(m_file), m_interval, ref(m_lock), ref(m_stop_token));
}
Expand Down
3 changes: 2 additions & 1 deletion xtransmit/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "misc.hpp"
#include "socket_stats.hpp"
#include "srt_socket_group.hpp"

#include "xtr_defs.hpp"
// submodules
#include "spdlog/spdlog.h"

Expand Down Expand Up @@ -177,6 +177,7 @@ class concurrent_pipes
void common_run(const vector<string>& urls, const stats_config& cfg_stats, const conn_config& cfg_conn,
const atomic_bool& break_token, processing_fn_t& processing_fn)
{
//XTR_THREADNAME(std::string("XTR:ConnMngmt"));
if (urls.empty())
{
spdlog::error(LOG_SC_CONN "URL was not provided");
Expand Down
22 changes: 3 additions & 19 deletions xtransmit/receive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "receive.hpp"
#include "metrics.hpp"
#include "metrics_writer.hpp"
#include "xtr_defs.hpp"

// OpenSRT
#include "apputil.hpp"
Expand All @@ -36,23 +37,6 @@ using shared_sock = std::shared_ptr<socket::isocket>;
#define LOG_SC_RECEIVE "RECEIVE "


namespace xtransmit
{
namespace details
{
#if defined(_MSC_VER) || __cplusplus >= 201402L // C++14 and beyond
using std::make_unique;
#else
template<typename T, typename... Args>
std::unique_ptr<T> make_unique(Args &&... args)
{
static_assert(!std::is_array<T>::value, "arrays are not supported");
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}
#endif
}
}

void trace_message(const size_t bytes, const vector<char>& buffer, SOCKET conn_id)
{
using std::cout;
Expand Down Expand Up @@ -81,7 +65,7 @@ void trace_message(const size_t bytes, const vector<char>& buffer, SOCKET conn_i

void run_pipe(shared_sock src, const config& cfg, unique_ptr<metrics::metrics_writer>& metrics, std::function<void(int conn_id)> const& on_done, const atomic_bool& force_break)
{
using std::make_shared;
XTR_THREADNAME(std::string("XTR:Rcv"));
socket::isocket& sock = *src.get();
const auto conn_id = sock.id();

Expand All @@ -90,7 +74,7 @@ void run_pipe(shared_sock src, const config& cfg, unique_ptr<metrics::metrics_wr

if (metrics)
{
validator = make_shared<metrics::validator>(conn_id);
validator = std::make_shared<metrics::validator>(conn_id);
metrics->add_validator(validator, conn_id);
}

Expand Down
2 changes: 2 additions & 0 deletions xtransmit/route.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// OpenSRT
#include "apputil.hpp"
#include "uriparser.hpp"
#include "xtr_defs.hpp"

using namespace std;
using namespace xtransmit;
Expand All @@ -39,6 +40,7 @@ namespace route
void route(shared_sock src, shared_sock dst,
const config& cfg, const string&& desc, const atomic_bool& force_break)
{
XTR_THREADNAME(std::string("XTR:Route"));
vector<char> buffer(cfg.message_size);

socket::isocket& sock_src = *src.get();
Expand Down
8 changes: 5 additions & 3 deletions xtransmit/socket_stats.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <thread>
#include "socket_stats.hpp"
#include "xtr_defs.hpp"

// submodules
#include "spdlog/spdlog.h"
Expand Down Expand Up @@ -78,7 +79,7 @@ future<void> xtransmit::socket::stats_writer::launch()
auto print_stats = [](map<SOCKET, shared_sock>& sock_vector,
ofstream& out,
mutex& stats_lock,
string format,
const string& format,
bool print_header)
{
#ifdef ENABLE_CXX17
Expand All @@ -94,7 +95,7 @@ future<void> xtransmit::socket::stats_writer::launch()
continue;
}

auto* s = it.second.get();
const auto* s = it.second.get();

try
{
Expand Down Expand Up @@ -130,7 +131,7 @@ future<void> xtransmit::socket::stats_writer::launch()

auto stats_func = [&print_stats](map<SOCKET, shared_sock>& sock_vector,
ofstream& out,
string& format,
const string& format,
const milliseconds interval,
mutex& stats_lock,
const atomic_bool& stop_stats) {
Expand All @@ -145,5 +146,6 @@ future<void> xtransmit::socket::stats_writer::launch()
}
};

XTR_THREADNAME(std::string("XTR:Stats"));
return async(::launch::async, stats_func, ref(m_sock), ref(m_logfile), ref(m_format), m_interval, ref(m_lock), ref(m_stop));
}
26 changes: 26 additions & 0 deletions xtransmit/xtr_defs.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once
#include "srt.h"

#if SRT_VERSION_VALUE >= SRT_MAKE_VERSION(1, 5, 0)
#include "threadname.h" // srt::ThreadName
#define XTR_THREADNAME(name) srt::ThreadName tn(name);
#else
#define XTR_THREADNAME(name)
#endif

namespace xtransmit
{
namespace details
{
template <typename T, typename... Args>
std::unique_ptr<T> make_unique(Args&&... args)
{
#if defined(_MSC_VER) || __cplusplus >= 201402L // C++14 and beyond
return std::make_unique<T>(std::forward<Args>(args)...);
#else
static_assert(!std::is_array<T>::value, "arrays are not supported");
return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
#endif
}
} // namespace details
} // namespace xtransmit

0 comments on commit 45eca3e

Please sign in to comment.