Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

process err, do error check, add some error code #1642

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions trunk/src/app/srs_app_http_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,15 @@ srs_error_t SrsGoApiSdp::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage*
// e.g. /api/v1/sdp/ args = json:{"sdp":"sdp...", "app":"webrtc", "stream":"test"}

string req_json;
r->body_read_all(req_json);
if ((err = r->body_read_all(req_json)) != srs_success) {
return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);
}

SrsJsonAny* json = SrsJsonAny::loads(req_json);
if (json == NULL) {
return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);
}

SrsJsonObject* req_obj = json->to_object();

SrsJsonAny* remote_sdp_obj = req_obj->get_property("sdp");
Expand Down Expand Up @@ -832,8 +838,7 @@ srs_error_t SrsGoApiSdp::serve_http(ISrsHttpResponseWriter* w, ISrsHttpMessage*
SrsRtcSession* rtc_session = rtc_server->create_rtc_session(request, remote_sdp, local_sdp);

string local_sdp_str = "";
err = local_sdp.encode(local_sdp_str);
if (err != srs_success) {
if ((err = local_sdp.encode(local_sdp_str)) != srs_success) {
return srs_api_response_code(w, r, SRS_CONSTS_HTTP_BadRequest);
}

Expand Down
23 changes: 12 additions & 11 deletions trunk/src/app/srs_app_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,20 +278,21 @@ int SrsUdpMuxSocket::recvfrom(srs_utime_t timeout)
return nread;
}

int SrsUdpMuxSocket::sendto(void* data, int size, srs_utime_t timeout)
srs_error_t SrsUdpMuxSocket::sendto(void* data, int size, srs_utime_t timeout)
{
return srs_sendto(lfd, data, size, (sockaddr*)&from, fromlen, timeout);
}
srs_error_t err = srs_success;

int SrsUdpMuxSocket::sendtov(struct iovec* iov, size_t iovlen, srs_utime_t timeout)
{
struct msghdr udphdr = {0};
udphdr.msg_name = &from;
udphdr.msg_namelen = fromlen;
udphdr.msg_iov = iov;
udphdr.msg_iovlen = iovlen;
int nb_write = srs_sendto(lfd, data, size, (sockaddr*)&from, fromlen, timeout);

if (nb_write <= 0) {
if (nb_write < 0 && errno == ETIME) {
return srs_error_new(ERROR_SOCKET_TIMEOUT, "sendto timeout %d ms", srsu2msi(timeout));
}

return srs_error_new(ERROR_SOCKET_WRITE, "sendto");
}

return srs_sendmsg(lfd, &udphdr, 0, timeout);
return err;
}

std::string SrsUdpMuxSocket::get_peer_id()
Expand Down
3 changes: 1 addition & 2 deletions trunk/src/app/srs_app_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ class SrsUdpMuxSocket
SrsUdpMuxSocket& operator=(const SrsUdpMuxSocket& rhs);

int recvfrom(srs_utime_t timeout);
int sendto(void* data, int size, srs_utime_t timeout);
int sendtov(struct iovec* iov, size_t iovlen, srs_utime_t timeout);
srs_error_t sendto(void* data, int size, srs_utime_t timeout);

char* data() { return buf; }
int size() { return nread; }
Expand Down
122 changes: 75 additions & 47 deletions trunk/src/app/srs_app_rtc_conn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,11 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt)
int ssl_err = SSL_get_error(dtls, ret);
switch(ssl_err) {
case SSL_ERROR_NONE: {
err = on_dtls_handshake_done(udp_mux_skt);
if ((err = on_dtls_handshake_done(udp_mux_skt)) != srs_success) {
return srs_error_wrap(err, "dtls handshake done handle");
}
break;
}
break;

case SSL_ERROR_WANT_READ: {
break;
Expand All @@ -345,7 +347,9 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt)
}

if (out_bio_len) {
udp_mux_skt->sendto(out_bio_data, out_bio_len, 0);
if ((err = udp_mux_skt->sendto(out_bio_data, out_bio_len, 0)) != srs_success) {
return srs_error_wrap(err, "send dtls packet");
}
}

return err;
Expand All @@ -354,23 +358,29 @@ srs_error_t SrsDtlsSession::handshake(SrsUdpMuxSocket* udp_mux_skt)
srs_error_t SrsDtlsSession::on_dtls(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
if (! handshake_done) {
BIO_reset(bio_in);
BIO_reset(bio_out);
BIO_write(bio_in, udp_mux_skt->data(), udp_mux_skt->size());
if (BIO_reset(bio_in) != 1) {
return srs_error_new(ERROR_OpenSslBIOReset, "BIO_reset");
}
if (BIO_reset(bio_out) != 1) {
return srs_error_new(ERROR_OpenSslBIOReset, "BIO_reset");
}

handshake(udp_mux_skt);
} else {
BIO_reset(bio_in);
BIO_reset(bio_out);
BIO_write(bio_in, udp_mux_skt->data(), udp_mux_skt->size());
if (BIO_write(bio_in, udp_mux_skt->data(), udp_mux_skt->size()) <= 0) {
// TODO: 0 or -1 maybe block, use BIO_should_retry to check.
return srs_error_new(ERROR_OpenSslBIOWrite, "BIO_write");
}

if (! handshake_done) {
err = handshake(udp_mux_skt);
} else {
while (BIO_ctrl_pending(bio_in) > 0) {
char dtls_read_buf[8092];
int nb = SSL_read(dtls, dtls_read_buf, sizeof(dtls_read_buf));

if (nb > 0) {
on_dtls_application_data(dtls_read_buf, nb);
if ((err =on_dtls_application_data(dtls_read_buf, nb)) != srs_success) {
return srs_error_wrap(err, "dtls application data process");
}
}
}
}
Expand All @@ -389,33 +399,46 @@ srs_error_t SrsDtlsSession::on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt)
return srs_error_wrap(err, "srtp init failed");
}

rtc_session->on_connection_established(udp_mux_skt);

return err;
return rtc_session->on_connection_established(udp_mux_skt);
}

srs_error_t SrsDtlsSession::on_dtls_application_data(const char* buf, const int nb_buf)
{
srs_error_t err = srs_success;

// TODO: process SCTP protocol(WebRTC datachannel support)

return err;
}

void SrsDtlsSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt)
srs_error_t SrsDtlsSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;

if (dtls == NULL) {
srs_verbose("send client hello");

dtls = SSL_new(SrsDtls::instance()->get_dtls_ctx());
if ((dtls = SSL_new(SrsDtls::instance()->get_dtls_ctx())) == NULL) {
return srs_error_new(ERROR_OpenSslCreateSSL, "SSL_new dtls");
}

SSL_set_connect_state(dtls);

bio_in = BIO_new(BIO_s_mem());
bio_out = BIO_new(BIO_s_mem());
if ((bio_in = BIO_new(BIO_s_mem())) == NULL) {
return srs_error_new(ERROR_OpenSslBIONew, "BIO_new in");
}

if ((bio_out = BIO_new(BIO_s_mem())) == NULL) {
BIO_free(bio_in);
return srs_error_new(ERROR_OpenSslBIONew, "BIO_new out");
}

SSL_set_bio(dtls, bio_in, bio_out);

handshake(udp_mux_skt);
return handshake(udp_mux_skt);
}

return err;
}

srs_error_t SrsDtlsSession::srtp_initialize()
Expand Down Expand Up @@ -724,7 +747,7 @@ srs_error_t SrsRtcSession::on_stun(SrsUdpMuxSocket* udp_mux_skt, SrsStunPacket*
srs_error_t err = srs_success;

if (stun_req->is_binding_request()) {
if (on_binding_request(udp_mux_skt, stun_req) != srs_success) {
if ((err = on_binding_request(udp_mux_skt, stun_req)) != srs_success) {
return srs_error_wrap(err, "stun binding request failed");
}
}
Expand Down Expand Up @@ -758,17 +781,20 @@ srs_error_t SrsRtcSession::on_binding_request(SrsUdpMuxSocket* udp_mux_skt, SrsS
stun_binding_response.set_mapped_address(be32toh(inet_addr(udp_mux_skt->get_peer_ip().c_str())));
stun_binding_response.set_mapped_port(udp_mux_skt->get_peer_port());

if (stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream) != srs_success) {
if ((err = stun_binding_response.encode(get_local_sdp()->get_ice_pwd(), stream)) != srs_success) {
return srs_error_wrap(err, "stun binding response encode failed");
}

if (udp_mux_skt->sendto(stream->data(), stream->pos(), 0) <= 0) {
if ((err = udp_mux_skt->sendto(stream->data(), stream->pos(), 0)) != srs_success) {
return srs_error_wrap(err, "stun binding response send failed");
}

if (get_session_state() == WAITING_STUN) {
if ((err = send_client_hello(udp_mux_skt)) != srs_success) {
return srs_error_wrap(err, "send client hello, failed");
}

set_session_state(DOING_DTLS_HANDSHAKE);
send_client_hello(udp_mux_skt);

peer_id = udp_mux_skt->get_peer_id();
rtc_server->insert_into_id_sessions(peer_id, this);
Expand Down Expand Up @@ -983,17 +1009,21 @@ block +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

srs_error_t SrsRtcSession::send_client_hello(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;

if (dtls_session == NULL) {
dtls_session = new SrsDtlsSession(this);
}

dtls_session->send_client_hello(udp_mux_skt);

return err;
}

void SrsRtcSession::on_connection_established(SrsUdpMuxSocket* udp_mux_skt)
srs_error_t SrsRtcSession::on_connection_established(SrsUdpMuxSocket* udp_mux_skt)
{
srs_trace("rtc session=%s, connection established", id().c_str());
start_play(udp_mux_skt);
return start_play(udp_mux_skt);
}

srs_error_t SrsRtcSession::start_play(SrsUdpMuxSocket* udp_mux_skt)
Expand All @@ -1002,7 +1032,9 @@ srs_error_t SrsRtcSession::start_play(SrsUdpMuxSocket* udp_mux_skt)

srs_freep(strd);
strd = new SrsRtcSenderThread(this, udp_mux_skt, _srs_context->get_id());
strd->start();
if ((err = strd->start()) != srs_success) {
return srs_error_wrap(err, "start SrsRtcSenderThread");
}

return err;
}
Expand All @@ -1016,12 +1048,12 @@ srs_error_t SrsRtcSession::on_rtp(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;
if (dtls_session == NULL) {
return srs_error_wrap(err, "recv unexpect rtp packet before dtls done");
return srs_error_new(ERROR_RTC_RTP, "recv unexpect rtp packet before dtls done");
}

char unprotected_buf[1460];
int nb_unprotected_buf = udp_mux_skt->size();
if (dtls_session->unprotect_rtp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf) != srs_success) {
if ((err = dtls_session->unprotect_rtp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "rtp unprotect failed");
}

Expand Down Expand Up @@ -1066,12 +1098,12 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt)
srs_error_t err = srs_success;

if (dtls_session == NULL) {
return srs_error_wrap(err, "recv unexpect rtcp packet before dtls done");
return srs_error_new(ERROR_RTC_RTCP, "recv unexpect rtp packet before dtls done");
}

char unprotected_buf[1460];
int nb_unprotected_buf = udp_mux_skt->size();
if (dtls_session->unprotect_rtcp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf) != srs_success) {
if ((err = dtls_session->unprotect_rtcp(unprotected_buf, udp_mux_skt->data(), nb_unprotected_buf)) != srs_success) {
return srs_error_wrap(err, "rtcp unprotect failed");
}

Expand All @@ -1084,7 +1116,7 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt)
int length = (length_4bytes + 1) * 4;

if (length > nb_unprotected_buf) {
return srs_error_wrap(err, "invalid rtcp packet, length=%u", length);
return srs_error_new(ERROR_RTC_RTCP, "invalid rtcp packet, length=%u", length);
}

srs_verbose("on rtcp, payload_type=%u", payload_type);
Expand Down Expand Up @@ -1120,8 +1152,8 @@ srs_error_t SrsRtcSession::on_rtcp(SrsUdpMuxSocket* udp_mux_skt)
}
}

if (err != srs_success) {
return err;
if (err != srs_success) {
return srs_error_wrap(err, "rtcp");
}

ph += length;
Expand Down Expand Up @@ -1156,8 +1188,6 @@ srs_error_t SrsRtcServer::initialize()

srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* udp_mux_skt)
{
srs_error_t err = srs_success;

if (is_stun(reinterpret_cast<const uint8_t*>(udp_mux_skt->data()), udp_mux_skt->size())) {
return on_stun(udp_mux_skt);
} else if (is_dtls(reinterpret_cast<const uint8_t*>(udp_mux_skt->data()), udp_mux_skt->size())) {
Expand All @@ -1166,7 +1196,7 @@ srs_error_t SrsRtcServer::on_udp_packet(SrsUdpMuxSocket* udp_mux_skt)
return on_rtp_or_rtcp(udp_mux_skt);
}

return srs_error_wrap(err, "unknown udp packet type");
return srs_error_new(ERROR_RTC_UDP, "unknown udp packet type");
}

SrsRtcSession* SrsRtcServer::create_rtc_session(const SrsRequest& req, const SrsSdp& remote_sdp, SrsSdp& local_sdp)
Expand Down Expand Up @@ -1213,14 +1243,14 @@ srs_error_t SrsRtcServer::on_stun(SrsUdpMuxSocket* udp_mux_skt)
srs_verbose("recv stun packet from %s", udp_mux_skt->get_peer_id().c_str());

SrsStunPacket stun_req;
if (stun_req.decode(udp_mux_skt->data(), udp_mux_skt->size()) != srs_success) {
if ((err = stun_req.decode(udp_mux_skt->data(), udp_mux_skt->size())) != srs_success) {
return srs_error_wrap(err, "decode stun packet failed");
}

std::string username = stun_req.get_username();
SrsRtcSession* rtc_session = find_rtc_session_by_username(username);
if (rtc_session == NULL) {
return srs_error_wrap(err, "can not find rtc_session, stun username=%s", username.c_str());
return srs_error_new(ERROR_RTC_STUN, "can not find rtc_session, stun username=%s", username.c_str());
}

return rtc_session->on_stun(udp_mux_skt, &stun_req);
Expand All @@ -1233,12 +1263,10 @@ srs_error_t SrsRtcServer::on_dtls(SrsUdpMuxSocket* udp_mux_skt)
SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id());

if (rtc_session == NULL) {
return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str());
return srs_error_new(ERROR_RTC_DTLS, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str());
}

rtc_session->on_dtls(udp_mux_skt);

return err;
return rtc_session->on_dtls(udp_mux_skt);
}

srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt)
Expand All @@ -1248,13 +1276,13 @@ srs_error_t SrsRtcServer::on_rtp_or_rtcp(SrsUdpMuxSocket* udp_mux_skt)
SrsRtcSession* rtc_session = find_rtc_session_by_peer_id(udp_mux_skt->get_peer_id());

if (rtc_session == NULL) {
return srs_error_wrap(err, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str());
return srs_error_new(ERROR_RTC_RTP, "can not find rtc session by peer_id=%s", udp_mux_skt->get_peer_id().c_str());
}

if (is_rtcp(reinterpret_cast<const uint8_t*>(udp_mux_skt->data()), udp_mux_skt->size())) {
rtc_session->on_rtcp(udp_mux_skt);
err = rtc_session->on_rtcp(udp_mux_skt);
} else {
rtc_session->on_rtp(udp_mux_skt);
err = rtc_session->on_rtp(udp_mux_skt);
}

return err;
Expand Down
4 changes: 2 additions & 2 deletions trunk/src/app/srs_app_rtc_conn.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class SrsDtlsSession
srs_error_t on_dtls_handshake_done(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_dtls_application_data(const char* data, const int len);

void send_client_hello(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t send_client_hello(SrsUdpMuxSocket* udp_mux_skt);
public:
srs_error_t protect_rtp(char* protected_buf, const char* ori_buf, int& nb_protected_buf);
srs_error_t unprotect_rtp(char* unprotected_buf, const char* ori_buf, int& nb_unprotected_buf);
Expand Down Expand Up @@ -221,7 +221,7 @@ class SrsRtcSession
srs_error_t on_rtcp(SrsUdpMuxSocket* udp_mux_skt);
public:
srs_error_t send_client_hello(SrsUdpMuxSocket* udp_mux_skt);
void on_connection_established(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t on_connection_established(SrsUdpMuxSocket* udp_mux_skt);
srs_error_t start_play(SrsUdpMuxSocket* udp_mux_skt);
public:
bool is_stun_timeout() { return last_stun_time + kSrsRtcSessionStunTimeoutUs < srs_get_system_time(); }
Expand Down
Loading