Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SRT: Use thread-safe log for multiple-threading SRT module. #2474

Merged
merged 15 commits into from
Jul 21, 2021
Merged
2 changes: 1 addition & 1 deletion trunk/configure
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ if [[ $SRS_SRT == YES ]]; then
MODULE_ID="SRT"
MODULE_DEPENDS=("CORE" "KERNEL" "PROTOCOL" "APP")
ModuleLibIncs=(${SRS_OBJS_DIR} ${LibSSLRoot} ${LibSRTRoot})
MODULE_FILES=("srt_server" "srt_handle" "srt_conn" "srt_to_rtmp" "ts_demux" "srt_data")
MODULE_FILES=("srt_server" "srt_handle" "srt_conn" "srt_to_rtmp" "ts_demux" "srt_data" "srt_log")
SRT_INCS=(${LibSRTRoot} ${SrsSRTRoot}); MODULE_DIR=${SrsSRTRoot} . auto/modules.sh
SRT_OBJS="${MODULE_OBJS[@]}"
fi
Expand Down
2 changes: 1 addition & 1 deletion trunk/src/kernel/srs_kernel_rtc_rtcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ SrsRtcpRR::SrsRtcpRR(uint32_t sender_ssrc)
header_.version = kRtcpVersion;
header_.length = 7;
ssrc_ = sender_ssrc;
memset(&rb_, 0, sizeof(SrsRtcpRB));
//memset(&rb_, 0, sizeof(SrsRtcpRB));
}

SrsRtcpRR::~SrsRtcpRR()
Expand Down
7 changes: 4 additions & 3 deletions trunk/src/srt/srt_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "srt_conn.hpp"
#include "time_help.hpp"
#include "stringex.hpp"
#include "srt_log.hpp"
#include <vector>

#include <srs_app_config.hpp>
Expand Down Expand Up @@ -134,7 +135,7 @@ srt_conn::srt_conn(SRTSOCKET conn_fd, const std::string& streamid):_conn_fd(conn
} else {
_vhost = "__default_host__";
}
srs_trace("srt connect construct streamid:%s, mode:%d, subpath:%s, vhost:%s",
srt_log_trace("srt connect construct streamid:%s, mode:%d, subpath:%s, vhost:%s",
streamid.c_str(), _mode, _url_subpath.c_str(), _vhost.c_str());
}

Expand Down Expand Up @@ -182,7 +183,7 @@ int srt_conn::read(unsigned char* data, int len) {

ret = srt_recv(_conn_fd, (char*)data, len);
if (ret <= 0) {
srs_error("srt read error:%d, socket fd:%d", ret, _conn_fd);
srt_log_error("srt read error:%d, socket fd:%d", ret, _conn_fd);
return ret;
}
return ret;
Expand All @@ -193,7 +194,7 @@ int srt_conn::write(unsigned char* data, int len) {

ret = srt_send(_conn_fd, (char*)data, len);
if (ret <= 0) {
srs_error("srt write error:%d, socket fd:%d", ret, _conn_fd);
srt_log_error("srt write error:%d, socket fd:%d", ret, _conn_fd);
return ret;
}
return ret;
Expand Down
19 changes: 19 additions & 0 deletions trunk/src/srt/srt_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ SRT_DATA_MSG::SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::s
memcpy(_data_p, data_p, len);
}

SRT_DATA_MSG::SRT_DATA_MSG(LOGGER_LEVEL log_level, const std::string& log_content): _msg_type(SRT_MSG_LOG_TYPE)
,_log_content(log_content)
,_log_level(log_level)
{

}

SRT_DATA_MSG::~SRT_DATA_MSG() {
if (_data_p && (_len > 0)) {
delete[] _data_p;
Expand All @@ -39,6 +46,10 @@ unsigned int SRT_DATA_MSG::msg_type() {
return _msg_type;
}

void SRT_DATA_MSG::set_msg_type(unsigned int msg_type) {
_msg_type = msg_type;
}

std::string SRT_DATA_MSG::get_path() {
return _key_path;
}
Expand All @@ -50,3 +61,11 @@ unsigned int SRT_DATA_MSG::data_len() {
unsigned char* SRT_DATA_MSG::get_data() {
return _data_p;
}

LOGGER_LEVEL SRT_DATA_MSG::get_log_level() {
return _log_level;
}

const char* SRT_DATA_MSG::get_log_string() {
return _log_content.c_str();
}
13 changes: 11 additions & 2 deletions trunk/src/srt/srt_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,40 @@
#ifndef SRT_DATA_H
#define SRT_DATA_H

#include "srt_log.hpp"
#include <srs_core.hpp>

#include <string>
#include <memory>

#define SRT_MSG_DATA_TYPE 0x01
#define SRT_MSG_CLOSE_TYPE 0x02
#define SRT_MSG_LOG_TYPE 0x03

class SRT_DATA_MSG {
public:
SRT_DATA_MSG(const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
SRT_DATA_MSG(unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
SRT_DATA_MSG(unsigned char* data_p, unsigned int len, const std::string& path, unsigned int msg_type=SRT_MSG_DATA_TYPE);
SRT_DATA_MSG(LOGGER_LEVEL log_level, const std::string& log_content);
~SRT_DATA_MSG();

unsigned int msg_type();
unsigned int data_len();
unsigned char* get_data();
std::string get_path();
LOGGER_LEVEL get_log_level();
const char* get_log_string();

void set_msg_type(unsigned int msg_type);

private:
unsigned int _msg_type;
unsigned int _len;
unsigned char* _data_p;
unsigned int _len = 0;
unsigned char* _data_p = nullptr;
std::string _key_path;
std::string _log_content;
LOGGER_LEVEL _log_level = SRT_LOGGER_TRACE_LEVEL;
};

typedef std::shared_ptr<SRT_DATA_MSG> SRT_DATA_MSG_PTR;
Expand Down
78 changes: 40 additions & 38 deletions trunk/src/srt/srt_handle.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@

#include "srt_handle.hpp"
#include "time_help.hpp"
#include "srt_log.hpp"

#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>

#include <srt/udt.h>
#include <stdio.h>
#include <vector>
Expand All @@ -14,11 +21,6 @@
#include <assert.h>
#include <list>

#include <srs_kernel_log.hpp>
#include <srs_kernel_error.hpp>
#include <srs_app_rtmp_conn.hpp>
#include <srs_app_config.hpp>

static bool MONITOR_STATICS_ENABLE = false;
static long long MONITOR_TIMEOUT = 5000;
const unsigned int DEF_DATA_SIZE = 188*7;
Expand Down Expand Up @@ -66,7 +68,7 @@ void srt_handle::debug_statics(SRTSOCKET srtsocket, const std::string& streamid)
output << "LINK RTT: " << std::setw(9) << mon.msRTT << "ms BANDWIDTH: " << std::setw(7) << mon.mbpsBandwidth << "Mb/s " << std::endl;
output << "BUFFERLEFT: SND: " << std::setw(11) << mon.byteAvailSndBuf << " RCV: " << std::setw(11) << mon.byteAvailRcvBuf << std::endl;

srs_trace("\r\n%s", output.str().c_str());
srt_log_trace("\r\n%s", output.str().c_str());
return;
}

Expand All @@ -79,18 +81,18 @@ void srt_handle::add_new_puller(SRT_CONN_PTR conn_ptr, std::string stream_id) {
srtsocket_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));

_streamid_map.insert(std::make_pair(stream_id, srtsocket_map));
srs_trace("add new puller fd:%d, streamid:%s", conn_ptr->get_conn(), stream_id.c_str());
srt_log_trace("add new puller fd:%d, streamid:%s", conn_ptr->get_conn(), stream_id.c_str());
} else {
iter->second.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
srs_trace("add new puller fd:%d, streamid:%s, size:%d",
srt_log_trace("add new puller fd:%d, streamid:%s, size:%d",
conn_ptr->get_conn(), stream_id.c_str(), iter->second.size());
}

return;
}

void srt_handle::close_pull_conn(SRTSOCKET srtsocket, std::string stream_id) {
srs_warn("close_pull_conn read_fd=%d, streamid=%s", srtsocket, stream_id.c_str());
srt_log_trace("close_pull_conn read_fd=%d, streamid=%s", srtsocket, stream_id.c_str());
srt_epoll_remove_usock(_handle_pollid, srtsocket);

auto streamid_iter = _streamid_map.find(stream_id);
Expand Down Expand Up @@ -140,36 +142,36 @@ void srt_handle::add_newconn(SRT_CONN_PTR conn_ptr, int events) {
int opt64_len = sizeof(int64_t);

srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_LATENCY, &val_i, &opt_len);
srs_trace("srto SRTO_LATENCY=%d", val_i);
srt_log_trace("srto SRTO_LATENCY=%d", val_i);

srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_PEERLATENCY, &val_i, &opt_len);
srs_trace("srto SRTO_PEERLATENCY=%d", val_i);
srt_log_trace("srto SRTO_PEERLATENCY=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVLATENCY, &val_i, &opt_len);
srs_trace("srto SRTO_RCVLATENCY=%d", val_i);
srt_log_trace("srto SRTO_RCVLATENCY=%d", val_i);

srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_SNDBUF, &val_i, &opt_len);
srs_trace("srto SRTO_SNDBUF=%d", val_i);
srt_log_trace("srto SRTO_SNDBUF=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_RCVBUF, &val_i, &opt_len);
srs_trace("srto SRTO_RCVBUF=%d", val_i);
srt_log_trace("srto SRTO_RCVBUF=%d", val_i);
srt_getsockopt(conn_ptr->get_conn(), 0, SRTO_MAXBW, &val_i64, &opt64_len);
srs_trace("srto SRTO_MAXBW=%d", val_i64);
srs_trace("srt mix_correct is %s.", _srs_config->get_srt_mix_correct() ? "enable" : "disable");
srs_trace("srt h264 sei filter is %s.", _srs_config->get_srt_sei_filter() ? "enable" : "disable");
srt_log_trace("srto SRTO_MAXBW=%d", val_i64);
srt_log_trace("srt mix_correct is %s.", _srs_config->get_srt_mix_correct() ? "enable" : "disable");
srt_log_trace("srt h264 sei filter is %s.", _srs_config->get_srt_sei_filter() ? "enable" : "disable");

if (conn_ptr->get_mode() == PULL_SRT_MODE) {
add_new_puller(conn_ptr, conn_ptr->get_subpath());
} else {
if(add_new_pusher(conn_ptr) == false) {
srs_trace("push connection is repeated and rejected, fd:%d, streamid:%s",
srt_log_trace("push connection is repeated and rejected, fd:%d, streamid:%s",
conn_ptr->get_conn(), conn_ptr->get_streamid().c_str());
conn_ptr->close();
return;
}
}
srs_trace("new conn added fd:%d, event:0x%08x", conn_ptr->get_conn(), events);
srt_log_trace("new conn added fd:%d, event:0x%08x", conn_ptr->get_conn(), events);
int ret = srt_epoll_add_usock(_handle_pollid, conn_ptr->get_conn(), &events);
if (ret < 0) {
srs_error("srt handle run add epoll error:%d", ret);
srt_log_error("srt handle run add epoll error:%d", ret);
return;
}

Expand All @@ -183,19 +185,19 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp
srt_conn_ptr = get_srt_conn(conn_fd);

if (!srt_conn_ptr) {
srs_error("handle_push_data fd:%d fail to find srt connection.", conn_fd);
srt_log_error("handle_push_data fd:%d fail to find srt connection.", conn_fd);
return;
}

if (status != SRTS_CONNECTED) {
srs_error("handle_push_data error status:%d fd:%d", status, conn_fd);
srt_log_error("handle_push_data error status:%d fd:%d", status, conn_fd);
close_push_conn(conn_fd);
return;
}

ret = srt_conn_ptr->read(data, DEF_DATA_SIZE);
if (ret <= 0) {
srs_error("handle_push_data srt connect read error:%d, fd:%d", ret, conn_fd);
srt_log_error("handle_push_data srt connect read error:%d, fd:%d", ret, conn_fd);
close_push_conn(conn_fd);
return;
}
Expand All @@ -208,22 +210,22 @@ void srt_handle::handle_push_data(SRT_SOCKSTATUS status, const std::string& subp
//streamid, play map<SRTSOCKET, SRT_CONN_PTR>
auto streamid_iter = _streamid_map.find(subpath);
if (streamid_iter == _streamid_map.end()) {//no puler
srs_info("receive data size(%d) from pusher(%d) but no puller", ret, conn_fd);
srt_log_info("receive data size(%d) from pusher(%d) but no puller", ret, conn_fd);
return;
}
srs_info("receive data size(%d) from pusher(%d) to pullers, count:%d",
srt_log_info("receive data size(%d) from pusher(%d) to pullers, count:%d",
ret, conn_fd, streamid_iter->second.size());

for (auto puller_iter = streamid_iter->second.begin();
puller_iter != streamid_iter->second.end();
puller_iter++) {
auto player_conn = puller_iter->second;
if (!player_conn) {
srs_error("handle_push_data get srt connect error from fd:%d", puller_iter->first);
srt_log_error("handle_push_data get srt connect error from fd:%d", puller_iter->first);
continue;
}
int write_ret = player_conn->write(data, ret);
srs_info("send data size(%d) to puller fd:%d", write_ret, puller_iter->first);
srt_log_info("send data size(%d) to puller fd:%d", write_ret, puller_iter->first);
if (write_ret > 0) {
puller_iter->second->update_timestamp(srt_now_ms);
}
Expand Down Expand Up @@ -261,15 +263,15 @@ void srt_handle::check_alive() {
{
SRT_CONN_PTR conn_ptr = *del_iter;
if (conn_ptr->get_mode() == PUSH_SRT_MODE) {
srs_warn("check alive close pull connection fd:%d, streamid:%s",
srt_log_warn("check alive close pull connection fd:%d, streamid:%s",
conn_ptr->get_conn(), conn_ptr->get_subpath().c_str());
close_push_conn(conn_ptr->get_conn());
} else if (conn_ptr->get_mode() == PULL_SRT_MODE) {
srs_warn("check alive close pull connection fd:%d, streamid:%s",
srt_log_warn("check alive close pull connection fd:%d, streamid:%s",
conn_ptr->get_conn(), conn_ptr->get_subpath().c_str());
close_pull_conn(conn_ptr->get_conn(), conn_ptr->get_subpath());
} else {
srs_error("check_alive get unkown srt mode:%d, fd:%d",
srt_log_error("check_alive get unkown srt mode:%d, fd:%d",
conn_ptr->get_mode(), conn_ptr->get_conn());
assert(0);
}
Expand Down Expand Up @@ -302,17 +304,17 @@ bool srt_handle::add_new_pusher(SRT_CONN_PTR conn_ptr) {
}
_push_conn_map.insert(std::make_pair(conn_ptr->get_subpath(), conn_ptr));
_conn_map.insert(std::make_pair(conn_ptr->get_conn(), conn_ptr));
srs_trace("srt_handle add new pusher streamid:%s, subpath:%s",
srt_log_trace("srt_handle add new pusher streamid:%s, subpath:%s",
conn_ptr->get_streamid().c_str(), conn_ptr->get_subpath().c_str());
return true;
}

void srt_handle::handle_pull_data(SRT_SOCKSTATUS status, const std::string& subpath, SRTSOCKET conn_fd) {
srs_info("handle_pull_data status:%d, subpath:%s, fd:%d",
srt_log_info("handle_pull_data status:%d, subpath:%s, fd:%d",
status, subpath.c_str(), conn_fd);
auto conn_ptr = get_srt_conn(conn_fd);
if (!conn_ptr) {
srs_error("handle_pull_data fail to find fd(%d)", conn_fd);
srt_log_error("handle_pull_data fail to find fd(%d)", conn_fd);
assert(0);
return;
}
Expand All @@ -327,7 +329,7 @@ void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd)

if (!conn_ptr) {
if (status != SRTS_CLOSED) {
srs_error("handle_srt_socket find srt connection error, fd:%d, status:%d",
srt_log_error("handle_srt_socket find srt connection error, fd:%d, status:%d",
conn_fd, status);
}
return;
Expand All @@ -349,13 +351,13 @@ void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd)
}
case SRTS_BROKEN:
{
srs_warn("srt push disconnected event fd:%d, streamid:%s",
srt_log_warn("srt push disconnected event fd:%d, streamid:%s",
conn_fd, conn_ptr->get_streamid().c_str());
close_push_conn(conn_fd);
break;
}
default:
srs_error("push mode unkown status:%d, fd:%d", status, conn_fd);
srt_log_error("push mode unkown status:%d, fd:%d", status, conn_fd);
break;
}
} else if (mode == PULL_SRT_MODE) {
Expand All @@ -368,13 +370,13 @@ void srt_handle::handle_srt_socket(SRT_SOCKSTATUS status, SRTSOCKET conn_fd)
}
case SRTS_BROKEN:
{
srs_warn("srt pull disconnected fd:%d, streamid:%s",
srt_log_warn("srt pull disconnected fd:%d, streamid:%s",
conn_fd, conn_ptr->get_streamid().c_str());
close_pull_conn(conn_fd, subpath);
break;
}
default:
srs_error("pull mode unkown status:%d, fd:%d", status, conn_fd);
srt_log_error("pull mode unkown status:%d, fd:%d", status, conn_fd);
break;
}
} else {
Expand Down
Loading