Skip to content

Commit cd534ad

Browse files
committed
implements batch sending
rather than writing a single message per trip through the underlying transport, writes are batched and sent as a group. This drastically improves the system call / application code ratio and tcp packet utilization rates when sending lots of small messages
1 parent e2b7a4b commit cd534ad

File tree

2 files changed

+68
-33
lines changed

2 files changed

+68
-33
lines changed

websocketpp/connection.hpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2013, Peter Thorson. All rights reserved.
2+
* Copyright (c) 2014, Peter Thorson. All rights reserved.
33
*
44
* Redistribution and use in source and binary forms, with or without
55
* modification, are permitted provided that the following conditions are met:
@@ -1451,9 +1451,9 @@ class connection
14511451
*/
14521452
std::vector<transport::buffer> m_send_buffer;
14531453

1454-
/// a pointer to hold on to the current message being written to keep it
1454+
/// a list of pointers to hold on to the messages being written to keep them
14551455
/// from going out of scope before the write is complete.
1456-
message_ptr m_current_msg;
1456+
std::vector<message_ptr> m_current_msgs;
14571457

14581458
/// True if there is currently an outstanding transport write
14591459
/**

websocketpp/impl/connection_impl.hpp

+65-30
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2013, Peter Thorson. All rights reserved.
2+
* Copyright (c) 2014, Peter Thorson. All rights reserved.
33
*
44
* Redistribution and use in source and binary forms, with or without
55
* modification, are permitted provided that the following conditions are met:
@@ -1562,44 +1562,78 @@ void connection<config>::write_frame() {
15621562
return;
15631563
}
15641564

1565-
// Get the next message in the queue. This will return an empty
1566-
// message if the queue was empty.
1567-
m_current_msg = write_pop();
1568-
1569-
if (!m_current_msg) {
1565+
// pull off all the messages that are ready to write.
1566+
// stop if we get a message marked terminal
1567+
message_ptr next_message = write_pop();
1568+
while (next_message) {
1569+
m_current_msgs.push_back(next_message);
1570+
if (!next_message->get_terminal()) {
1571+
next_message = write_pop();
1572+
} else {
1573+
next_message = message_ptr();
1574+
}
1575+
}
1576+
1577+
if (m_current_msgs.empty()) {
1578+
// there was nothing to send
15701579
return;
1580+
} else {
1581+
// At this point we own the next messages to be sent and are
1582+
// responsible for holding the write flag until they are
1583+
// successfully sent or there is some error
1584+
m_write_flag = true;
15711585
}
1572-
1573-
// At this point we own the next message to be sent and are
1574-
// responsible for holding the write flag until it is successfully
1575-
// sent or there is some error
1576-
m_write_flag = true;
15771586
}
15781587

1579-
std::string const & header = m_current_msg->get_header();
1580-
std::string const & payload = m_current_msg->get_payload();
1581-
1582-
m_send_buffer.push_back(transport::buffer(header.c_str(),header.size()));
1583-
m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size()));
1588+
typename std::vector<message_ptr>::iterator it;
1589+
for (it = m_current_msgs.begin(); it != m_current_msgs.end(); ++it) {
1590+
std::string const & header = (*it)->get_header();
1591+
std::string const & payload = (*it)->get_payload();
15841592

1593+
m_send_buffer.push_back(transport::buffer(header.c_str(),header.size()));
1594+
m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size()));
1595+
}
15851596

1597+
// Print detailed send stats if those log levels are enabled
15861598
if (m_alog.static_test(log::alevel::frame_header)) {
15871599
if (m_alog.dynamic_test(log::alevel::frame_header)) {
1588-
std::stringstream s;
1589-
s << "Dispatching write with " << header.size()
1590-
<< " header bytes and " << payload.size()
1591-
<< " payload bytes" << std::endl;
1592-
m_alog.write(log::alevel::frame_header,s.str());
1593-
m_alog.write(log::alevel::frame_header,"Header: "+utility::to_hex(header));
1594-
}
1595-
}
1596-
if (m_alog.static_test(log::alevel::frame_payload)) {
1597-
if (m_alog.dynamic_test(log::alevel::frame_payload)) {
1598-
m_alog.write(log::alevel::frame_payload,"Payload: "+utility::to_hex(payload));
1600+
std::stringstream general,header,payload;
1601+
1602+
general << "Dispatching write containing " << m_current_msgs.size()
1603+
<<" message(s) containing ";
1604+
header << "Header Bytes: \n";
1605+
payload << "Payload Bytes: \n";
1606+
1607+
size_t hbytes = 0;
1608+
size_t pbytes = 0;
1609+
1610+
for (size_t i = 0; i < m_current_msgs.size(); i++) {
1611+
hbytes += m_current_msgs[i]->get_header().size();
1612+
pbytes += m_current_msgs[i]->get_payload().size();
1613+
1614+
1615+
header << "[" << i << "] ("
1616+
<< m_current_msgs[i]->get_header().size() << ") "
1617+
<< utility::to_hex(m_current_msgs[i]->get_header()) << "\n";
1618+
1619+
if (m_alog.static_test(log::alevel::frame_payload)) {
1620+
if (m_alog.dynamic_test(log::alevel::frame_payload)) {
1621+
payload << "[" << i << "] ("
1622+
<< m_current_msgs[i]->get_payload().size() << ") "
1623+
<< utility::to_hex(m_current_msgs[i]->get_payload())
1624+
<< "\n";
1625+
}
1626+
}
1627+
}
1628+
1629+
general << hbytes << " header bytes and " << pbytes << " payload bytes";
1630+
1631+
m_alog.write(log::alevel::frame_header,general.str());
1632+
m_alog.write(log::alevel::frame_header,header.str());
1633+
m_alog.write(log::alevel::frame_payload,payload.str());
15991634
}
16001635
}
16011636

1602-
16031637
transport_con_type::async_write(
16041638
m_send_buffer,
16051639
m_write_frame_handler
@@ -1613,10 +1647,11 @@ void connection<config>::handle_write_frame(lib::error_code const & ec)
16131647
m_alog.write(log::alevel::devel,"connection handle_write_frame");
16141648
}
16151649

1616-
bool terminal = m_current_msg->get_terminal();
1650+
bool terminal = m_current_msgs.back()->get_terminal();
16171651

16181652
m_send_buffer.clear();
1619-
m_current_msg.reset();
1653+
m_current_msgs.clear();
1654+
// TODO: recycle instead of deleting
16201655

16211656
if (ec) {
16221657
log_err(log::elevel::fatal,"handle_write_frame",ec);

0 commit comments

Comments
 (0)