diff --git a/README.md b/README.md
index 1ac7efbdd7..b521f73d52 100755
--- a/README.md
+++ b/README.md
@@ -242,6 +242,7 @@ Supported operating systems and hardware:
* 2013-10-17, Created.
## History
+* v2.0, 2014-11-08, fix [#194](https://github.com/winlinvip/simple-rtmp-server/issues/194), writev multiple msgs, support 6k+ 250kbps clients. 2.0.15.
* v2.0, 2014-11-08, fix [#194](https://github.com/winlinvip/simple-rtmp-server/issues/194), optmized st for timeout recv. pulse to 500ms. 2.0.14.
* v2.0, 2014-11-08, fix [#195](https://github.com/winlinvip/simple-rtmp-server/issues/195), remove the confuse code st_usleep(0). 2.0.13.
* v2.0, 2014-11-08, fix [#191](https://github.com/winlinvip/simple-rtmp-server/issues/191), configure --export-librtmp-project and --export-librtmp-single. 2.0.11.
@@ -449,6 +450,7 @@ Performance benchmark history, on virtual box:
* 2014-11-11, SRS 1.0.5, 2700clients, 85%CPU, 66MB. (1.0 equals 2.0.12)
* 2014-11-12, SRS 2.0.14, 2700clients, 69%CPU, 59MB.
* 2014-11-12, SRS 2.0.14, 3500clients, 95%CPU, 78MB.
+* 2014-11-13, SRS 2.0.15, 6000clients, 82%CPU, 203MB.
Latest benchmark(2014-07-12):
diff --git a/trunk/src/app/srs_app_rtmp_conn.cpp b/trunk/src/app/srs_app_rtmp_conn.cpp
index e9bcf5f94d..acd302059e 100644
--- a/trunk/src/app/srs_app_rtmp_conn.cpp
+++ b/trunk/src/app/srs_app_rtmp_conn.cpp
@@ -525,10 +525,7 @@ int SrsRtmpConn::playing(SrsSource* source)
int64_t starttime = -1;
while (true) {
- // collect elapse for pithy print.
- pithy_print.elapse();
-
- // to use isolate thread to recv, can improve about 5% performance.
+ // TODO: to use isolate thread to recv, can improve about 5% performance.
// @see: https://github.com/winlinvip/simple-rtmp-server/issues/196
// read from client.
if (true) {
@@ -539,6 +536,7 @@ int SrsRtmpConn::playing(SrsSource* source)
if (ret == ERROR_SOCKET_TIMEOUT) {
// it's ok, do nothing.
ret = ERROR_SUCCESS;
+ srs_verbose("recv timeout, ignore. ret=%d", ret);
} else if (ret != ERROR_SUCCESS) {
if (!srs_is_client_gracefully_close(ret)) {
srs_error("recv client control message failed. ret=%d", ret);
@@ -554,6 +552,9 @@ int SrsRtmpConn::playing(SrsSource* source)
}
}
+ // collect elapse for pithy print.
+ pithy_print.elapse();
+
// get messages from consumer.
int count = 0;
if ((ret = consumer->dump_packets(msgs.size, msgs.msgs, count)) != ERROR_SUCCESS) {
@@ -568,22 +569,16 @@ int SrsRtmpConn::playing(SrsSource* source)
" time=%"PRId64", msgs=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d",
pithy_print.age(), count,
kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),
- kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m());
+ kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m()
+ );
}
- // sendout messages
- // @remark, becareful, all msgs must be free explicitly,
- // free by send_and_free_message or srs_freep.
- for (int i = 0; i < count; i++) {
- SrsSharedPtrMessage* msg = msgs.msgs[i];
-
- // the send_message will free the msg,
- // so set the msgs[i] to NULL.
- msgs.msgs[i] = NULL;
-
- // only when user specifies the duration,
- // we start to collect the durations for each message.
- if (user_specified_duration_to_stop) {
+ // only when user specifies the duration,
+ // we start to collect the durations for each message.
+ if (user_specified_duration_to_stop) {
+ for (int i = 0; i < count; i++) {
+ SrsSharedPtrMessage* msg = msgs.msgs[i];
+
// foreach msg, collect the duration.
// @remark: never use msg when sent it, for the protocol sdk will free it.
if (starttime < 0 || starttime > msg->header.timestamp) {
@@ -592,12 +587,23 @@ int SrsRtmpConn::playing(SrsSource* source)
duration += msg->header.timestamp - starttime;
starttime = msg->header.timestamp;
}
-
+ }
+
+ // sendout messages
+ // @remark, becareful, all msgs must be free explicitly,
+ // free by send_and_free_message or srs_freep.
+ if (count > 0) {
// no need to assert msg, for the rtmp will assert it.
- if ((ret = rtmp->send_and_free_message(msg, res->stream_id)) != ERROR_SUCCESS) {
- srs_error("send message to client failed. ret=%d", ret);
- return ret;
- }
+ ret = rtmp->send_and_free_messages(msgs.msgs, count, res->stream_id);
+ }
+ for (int i = 0; i < count; i++) {
+ // the send_message will free the msg,
+ // so set the msgs[i] to NULL.
+ msgs.msgs[i] = NULL;
+ }
+ if (ret != ERROR_SUCCESS) {
+ srs_error("send messages to client failed. ret=%d", ret);
+ return ret;
}
// if duration specified, and exceed it, stop play live.
diff --git a/trunk/src/core/srs_core.hpp b/trunk/src/core/srs_core.hpp
index 9d6dbc9e12..78625cbf74 100644
--- a/trunk/src/core/srs_core.hpp
+++ b/trunk/src/core/srs_core.hpp
@@ -31,7 +31,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// current release version
#define VERSION_MAJOR 2
#define VERSION_MINOR 0
-#define VERSION_REVISION 14
+#define VERSION_REVISION 15
// server info.
#define RTMP_SIG_SRS_KEY "SRS"
#define RTMP_SIG_SRS_ROLE "origin/edge server"
diff --git a/trunk/src/kernel/srs_kernel_consts.hpp b/trunk/src/kernel/srs_kernel_consts.hpp
index defc2d556c..4bb5513116 100644
--- a/trunk/src/kernel/srs_kernel_consts.hpp
+++ b/trunk/src/kernel/srs_kernel_consts.hpp
@@ -97,6 +97,19 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// always use fmt0 as cache.
//#define SRS_CONSTS_RTMP_MAX_FMT3_HEADER_SIZE 5
+/**
+* for performance issue,
+* the iovs cache, @see https://github.com/winlinvip/simple-rtmp-server/issues/194
+* iovs cache for multiple messages for each connections.
+*/
+#define SRS_CONSTS_IOVS_MAX 1024
+/**
+* for performance issue,
+* the c0c3 cache, @see https://github.com/winlinvip/simple-rtmp-server/issues/194
+* c0c3 cache for multiple messages for each connections.
+*/
+#define SRS_CONSTS_C0C3_HEADERS_MAX 4096
+
///////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////
diff --git a/trunk/src/rtmp/srs_protocol_rtmp.cpp b/trunk/src/rtmp/srs_protocol_rtmp.cpp
index 304b59a788..76a0b2cb18 100644
--- a/trunk/src/rtmp/srs_protocol_rtmp.cpp
+++ b/trunk/src/rtmp/srs_protocol_rtmp.cpp
@@ -771,6 +771,11 @@ int SrsRtmpServer::send_and_free_message(SrsMessage* msg, int stream_id)
return protocol->send_and_free_message(msg, stream_id);
}
+int SrsRtmpServer::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)
+{
+ return protocol->send_and_free_messages(msgs, nb_msgs, stream_id);
+}
+
int SrsRtmpServer::send_and_free_packet(SrsPacket* packet, int stream_id)
{
return protocol->send_and_free_packet(packet, stream_id);
diff --git a/trunk/src/rtmp/srs_protocol_rtmp.hpp b/trunk/src/rtmp/srs_protocol_rtmp.hpp
index fcf224f306..e6dcb0b513 100644
--- a/trunk/src/rtmp/srs_protocol_rtmp.hpp
+++ b/trunk/src/rtmp/srs_protocol_rtmp.hpp
@@ -368,6 +368,15 @@ class SrsRtmpServer
*/
virtual int send_and_free_message(SrsMessage* msg, int stream_id);
/**
+ * send the RTMP message and always free it.
+ * user must never free or use the msg after this method,
+ * for it will always free the msg.
+ * @param msgs, the msgs to send out, never be NULL.
+ * @param nb_msgs, the size of msgs to send out.
+ * @param stream_id, the stream id of packet to send over, 0 for control message.
+ */
+ virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id);
+ /**
* send the RTMP packet and always free it.
* user must never free or use the packet after this method,
* for it will always free the packet.
diff --git a/trunk/src/rtmp/srs_protocol_stack.cpp b/trunk/src/rtmp/srs_protocol_stack.cpp
index 47255cd86e..8f2bb11a2e 100644
--- a/trunk/src/rtmp/srs_protocol_stack.cpp
+++ b/trunk/src/rtmp/srs_protocol_stack.cpp
@@ -29,6 +29,7 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#include
#include
+#include
using namespace std;
// when got a messae header, there must be some data,
@@ -404,7 +405,15 @@ SrsProtocol::SrsProtocol(ISrsProtocolReaderWriter* io)
in_buffer = new SrsBuffer();
skt = io;
- in_chunk_size = out_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE;
+ in_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE;
+ out_chunk_size = SRS_CONSTS_RTMP_PROTOCOL_CHUNK_SIZE;
+
+ nb_out_iovs = SRS_CONSTS_IOVS_MAX;
+ out_iovs = (iovec*)malloc(sizeof(iovec) * nb_out_iovs);
+ // each chunk consumers atleast 2 iovs
+ srs_assert(nb_out_iovs >= 2);
+
+ warned_c0c3_caches = false;
}
SrsProtocol::~SrsProtocol()
@@ -421,6 +430,12 @@ SrsProtocol::~SrsProtocol()
}
srs_freep(in_buffer);
+
+ // alloc by malloc, use free directly.
+ if (out_iovs) {
+ free(out_iovs);
+ out_iovs = NULL;
+ }
}
void SrsProtocol::set_recv_timeout(int64_t timeout_us)
@@ -560,7 +575,7 @@ int SrsProtocol::do_send_message(SrsMessage* msg)
// always has header
int nbh = 0;
char* header = NULL;
- generate_chunk_header(&msg->header, p == msg->payload, &nbh, &header);
+ generate_chunk_header(out_c0c3_cache, &msg->header, p == msg->payload, &nbh, &header);
srs_assert(nbh > 0);
// header iov
@@ -590,10 +605,130 @@ int SrsProtocol::do_send_message(SrsMessage* msg)
return ret;
}
-void SrsProtocol::generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph)
+int SrsProtocol::do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs)
{
- char* cache = out_c0c3_cache;
+ int ret = ERROR_SUCCESS;
+
+ // TODO: FIXME: use cache system instead.
+ int iov_index = 0;
+ iovec* iov = out_iovs + iov_index;
+
+ int c0c3_cache_index = 0;
+ char* c0c3_cache = out_c0c3_caches + c0c3_cache_index;
+
+ // try to send use the c0c3 header cache,
+ // if cache is consumed, try another loop.
+ for (int i = 0; i < nb_msgs; i++) {
+ SrsMessage* msg = msgs[i];
+
+ // ignore empty message.
+ if (!msg->payload || msg->size <= 0) {
+ srs_info("ignore empty message.");
+ continue;
+ }
+
+ // we donot use the complex basic header,
+ // ensure the basic header is 1bytes.
+ if (msg->header.perfer_cid < 2) {
+ srs_warn("change the chunk_id=%d to default=%d",
+ msg->header.perfer_cid, RTMP_CID_ProtocolControl);
+ msg->header.perfer_cid = RTMP_CID_ProtocolControl;
+ }
+
+ // p set to current write position,
+ // it's ok when payload is NULL and size is 0.
+ char* p = msg->payload;
+ char* pend = msg->payload + msg->size;
+
+ // always write the header event payload is empty.
+ while (p < pend) {
+ // always has header
+ int nbh = 0;
+ char* header = NULL;
+ generate_chunk_header(c0c3_cache, &msg->header, p == msg->payload, &nbh, &header);
+ srs_assert(nbh > 0);
+
+ // header iov
+ iov[0].iov_base = header;
+ iov[0].iov_len = nbh;
+
+ // payload iov
+ int payload_size = pend - p;
+ if (payload_size > out_chunk_size) {
+ payload_size = out_chunk_size;
+ }
+ iov[1].iov_base = p;
+ iov[1].iov_len = payload_size;
+
+ // consume sendout bytes.
+ p += payload_size;
+
+ // realloc the iovs if exceed,
+ // for we donot know how many messges maybe to send entirely,
+ // we just alloc the iovs, it's ok.
+ if (iov_index >= nb_out_iovs - 2) {
+ nb_out_iovs += SRS_CONSTS_IOVS_MAX;
+ int realloc_size = sizeof(iovec) * nb_out_iovs;
+ out_iovs = (iovec*)realloc(out_iovs, realloc_size);
+ }
+
+ // to next pair of iovs
+ iov_index += 2;
+ iov = out_iovs + iov_index;
+
+ // to next c0c3 header cache
+ c0c3_cache_index += nbh;
+ c0c3_cache = out_c0c3_caches + c0c3_cache_index;
+
+ // the cache header should never be realloc again,
+ // for the ptr is set to iovs, so we just warn user to set larger
+ // and use another loop to send again.
+ int c0c3_left = SRS_CONSTS_C0C3_HEADERS_MAX - c0c3_cache_index;
+ if (c0c3_left < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) {
+ // only warn once for a connection.
+ if (!warned_c0c3_caches) {
+ srs_warn("c0c3 cache header too small, recoment to %d",
+ SRS_CONSTS_C0C3_HEADERS_MAX + SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE);
+ warned_c0c3_caches = true;
+ }
+
+ // when c0c3 cache dry,
+ // sendout all messages and reset the cache, then send again.
+ if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) {
+ srs_error("send with writev failed. ret=%d", ret);
+ return ret;
+ }
+ // reset caches, while these cache ensure
+ // atleast we can sendout a chunk.
+ iov_index = 0;
+ iov = out_iovs + iov_index;
+
+ c0c3_cache_index = 0;
+ c0c3_cache = out_c0c3_caches + c0c3_cache_index;
+ }
+ }
+ }
+
+ // maybe the iovs already sendout when c0c3 cache dry,
+ // so just ignore when no iovs to send.
+ if (iov_index <= 0) {
+ return ret;
+ }
+
+ // send by writev
+ // sendout header and payload by writev.
+ // decrease the sys invoke count to get higher performance.
+ if ((ret = skt->writev(out_iovs, iov_index, NULL)) != ERROR_SUCCESS) {
+ srs_error("send with writev failed. ret=%d", ret);
+ return ret;
+ }
+
+ return ret;
+}
+
+void SrsProtocol::generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph)
+{
// to directly set the field.
char* pp = NULL;
@@ -856,6 +991,34 @@ int SrsProtocol::send_and_free_message(SrsMessage* msg, int stream_id)
return ret;
}
+int SrsProtocol::send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id)
+{
+ // always not NULL msg.
+ srs_assert(msgs);
+ srs_assert(nb_msgs > 0);
+
+ // update the stream id in header.
+ for (int i = 0; i < nb_msgs; i++) {
+ SrsMessage* msg = msgs[i];
+ // we assume that the stream_id in a group must be the same.
+ if (msg->header.stream_id == stream_id) {
+ break;
+ }
+ msg->header.stream_id = stream_id;
+ }
+
+ // donot use the auto free to free the msg,
+ // for performance issue.
+ int ret = do_send_messages(msgs, nb_msgs);
+
+ for (int i = 0; i < nb_msgs; i++) {
+ SrsMessage* msg = msgs[i];
+ srs_freep(msg);
+ }
+
+ return ret;
+}
+
int SrsProtocol::send_and_free_packet(SrsPacket* packet, int stream_id)
{
int ret = ERROR_SUCCESS;
diff --git a/trunk/src/rtmp/srs_protocol_stack.hpp b/trunk/src/rtmp/srs_protocol_stack.hpp
index 071f5168f7..2f4914d9cc 100644
--- a/trunk/src/rtmp/srs_protocol_stack.hpp
+++ b/trunk/src/rtmp/srs_protocol_stack.hpp
@@ -47,6 +47,7 @@ class SrsAmf0Any;
class SrsMessageHeader;
class SrsMessage;
class SrsChunkStream;
+class SrsSharedPtrMessage;
/**
* 4.1. Message Header
@@ -221,6 +222,15 @@ class SrsProtocol
*/
iovec out_iov[2];
/**
+ * cache for multiple messages send
+ */
+ iovec* out_iovs;
+ int nb_out_iovs;
+ // the c0c3 cache cannot be realloc.
+ char out_c0c3_caches[SRS_CONSTS_C0C3_HEADERS_MAX];
+ // whether warned user to increase the c0c3 header cache.
+ bool warned_c0c3_caches;
+ /**
* output chunk size, default to 128, set by config.
*/
int32_t out_chunk_size;
@@ -276,6 +286,15 @@ class SrsProtocol
*/
virtual int send_and_free_message(SrsMessage* msg, int stream_id);
/**
+ * send the RTMP message and always free it.
+ * user must never free or use the msg after this method,
+ * for it will always free the msg.
+ * @param msgs, the msgs to send out, never be NULL.
+ * @param nb_msgs, the size of msgs to send out.
+ * @param stream_id, the stream id of packet to send over, 0 for control message.
+ */
+ virtual int send_and_free_messages(SrsSharedPtrMessage** msgs, int nb_msgs, int stream_id);
+ /**
* send the RTMP packet and always free it.
* user must never free or use the packet after this method,
* for it will always free the packet.
@@ -349,6 +368,11 @@ class SrsProtocol
*/
virtual int do_send_message(SrsMessage* msg);
/**
+ * send out the messages, donot free it,
+ * the caller must free the param msgs.
+ */
+ virtual int do_send_messages(SrsSharedPtrMessage** msgs, int nb_msgs);
+ /**
* generate the chunk header for msg.
* @param mh, the header of msg to send.
* @param c0, whether the first chunk, the c0 chunk.
@@ -356,7 +380,7 @@ class SrsProtocol
* @param ph, output the header cache.
* user should never free it, it's cached header.
*/
- virtual void generate_chunk_header(SrsMessageHeader* mh, bool c0, int* pnbh, char** ph);
+ virtual void generate_chunk_header(char* cache, SrsMessageHeader* mh, bool c0, int* pnbh, char** ph);
/**
* imp for decode_message
*/