Skip to content

Commit 027afb7

Browse files
committed
Per-message deflate as described in RFC 7692
Github issue #5 For my work-load (somewhat redundant JSON) it reduces the transmitted message sizes to nearly 1/10th their uncompressed sizes so it will be a huge bandwidth saver. It basically does the bare minimum to be RFC 7692 compatible. I have tested the compression with chromium and websocketpp 0.7.0 and seasocks seems to compress/decompress messages properly in both directions. Current limitations: - Can't configure the *_no_context_takeover or *_max_window_bits extensions. It will only negotiate the default (context takeover and max window bits in both directions) - All transmitted messages will be compressed. Ideally you should be able to opt-out of compressing particular messages (but it does support clients that compress particular messages) - Only ever issues one DEFLATE block per message since the seasocks API doesn't support incremental message construction (but I believe it does support receiving multiple blocks from clients that do) - Various tuning parameters such as buffer size and memLevel are hard-coded. Some apps might like to customise them - Requires linking to zlib even if no compression will be used - No tests :(
1 parent cc1c68a commit 027afb7

File tree

11 files changed

+215
-9
lines changed

11 files changed

+215
-9
lines changed

CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ if( NOT PYTHON_BIN )
3434
endif()
3535

3636

37+
find_package(ZLIB REQUIRED)
38+
39+
3740
add_subdirectory("src/main/c")
3841
add_subdirectory("src/main/web")
3942
add_subdirectory("src/app/c")
@@ -43,4 +46,3 @@ if (UNITTESTS)
4346
enable_testing()
4447
add_subdirectory("src/test/c")
4548
endif ()
46-

src/app/c/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
macro(add_app _NAME)
22
add_executable(${_NAME} ${_NAME}.cpp)
3-
target_link_libraries(${_NAME} seasocks)
3+
target_link_libraries(${_NAME} seasocks "${ZLIB_LIBRARIES}")
44
endmacro()
55

66
add_app(ph_test)

src/main/c/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ target_include_directories(seasocks PUBLIC .)
2828
target_link_libraries(seasocks PRIVATE ${CMAKE_THREAD_LIBS_INIT} embedded)
2929

3030
add_library(seasocks_so SHARED $<TARGET_OBJECTS:seasocks_obj>)
31-
target_include_directories(seasocks_so PUBLIC .)
32-
target_link_libraries(seasocks_so PRIVATE ${CMAKE_THREAD_LIBS_INIT} embedded)
31+
target_include_directories(seasocks_so PUBLIC ${ZLIB_INCLUDE_DIRS} .)
32+
target_link_libraries(seasocks_so PRIVATE ${CMAKE_THREAD_LIBS_INIT} embedded "${ZLIB_LIBRARIES}")
3333
set_target_properties(seasocks_so PROPERTIES OUTPUT_NAME seasocks)
3434

3535
install(TARGETS seasocks seasocks_so

src/main/c/Connection.cpp

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
#include <unistd.h>
6060
#include <byteswap.h>
6161
#include <unordered_map>
62+
#include <memory>
6263

6364
namespace {
6465

@@ -570,7 +571,22 @@ void Connection::send(const uint8_t* data, size_t length) {
570571

571572
void Connection::sendHybi(uint8_t opcode, const uint8_t* webSocketResponse, size_t messageLength) {
572573
uint8_t firstByte = 0x80 | opcode;
574+
if (_perMessageDeflate) firstByte |= 0x40;
573575
if (!write(&firstByte, 1, false)) return;
576+
577+
if (_perMessageDeflate) {
578+
std::vector<uint8_t> compressed;
579+
580+
zlibContext->deflate(webSocketResponse, messageLength, compressed);
581+
582+
LS_DEBUG(_logger, "Compression result: " << messageLength << " bytes -> " << compressed.size() << " bytes");
583+
sendHybiData(compressed.data(), compressed.size());
584+
} else {
585+
sendHybiData(webSocketResponse, messageLength);
586+
}
587+
}
588+
589+
void Connection::sendHybiData(const uint8_t* webSocketResponse, size_t messageLength) {
574590
if (messageLength < 126) {
575591
uint8_t nextByte = messageLength; // No MASK bit set.
576592
if (!write(&nextByte, 1, false)) return;
@@ -637,7 +653,38 @@ void Connection::handleHybiWebSocket() {
637653
bool done = false;
638654
while (!done) {
639655
std::vector<uint8_t> decodedMessage;
640-
switch (decoder.decodeNextMessage(decodedMessage)) {
656+
bool deflateNeeded = false;
657+
658+
auto messageState = decoder.decodeNextMessage(decodedMessage, deflateNeeded);
659+
660+
if (deflateNeeded) {
661+
if (!_perMessageDeflate) {
662+
LS_WARNING(_logger, "Received deflated hybi frame but deflate wasn't negotiated");
663+
closeInternal();
664+
return;
665+
}
666+
667+
size_t compressed_size = decodedMessage.size();
668+
669+
std::vector<uint8_t> decompressed;
670+
int zlibError;
671+
672+
// Note: inflate() alters decodedMessage
673+
bool success = zlibContext->inflate(decodedMessage, decompressed, zlibError);
674+
675+
if (!success) {
676+
LS_WARNING(_logger, "Decompression error from zlib: " << zlibError);
677+
closeInternal();
678+
return;
679+
}
680+
681+
LS_DEBUG(_logger, "Decompression result: " << compressed_size << " bytes -> " << decodedMessage.size() << " bytes");
682+
683+
decodedMessage.swap(decompressed);
684+
}
685+
686+
687+
switch (messageState) {
641688
default:
642689
closeInternal();
643690
LS_WARNING(_logger, "Unknown HybiPacketDecoder state");
@@ -809,6 +856,10 @@ bool Connection::processHeaders(uint8_t* first, uint8_t* last) {
809856
return send404();
810857
}
811858
verb = Request::Verb::WebSocket;
859+
860+
if (_server.server().getPerMessageDeflateEnabled() && headers.count("Sec-WebSocket-Extensions")) {
861+
parsePerMessageDeflateHeader(headers["Sec-WebSocket-Extensions"]);
862+
}
812863
}
813864

814865
_request.reset(new PageRequest(_address, requestUri, _server.server(),
@@ -975,6 +1026,7 @@ bool Connection::handleHybiHandshake(
9751026
bufferLine("Upgrade: websocket");
9761027
bufferLine("Connection: Upgrade");
9771028
bufferLine("Sec-WebSocket-Accept: " + getAcceptKey(webSocketKey));
1029+
if (_perMessageDeflate) bufferLine("Sec-WebSocket-Extensions: permessage-deflate");
9781030
bufferLine("");
9791031
flush();
9801032

@@ -985,6 +1037,19 @@ bool Connection::handleHybiHandshake(
9851037
return true;
9861038
}
9871039

1040+
void Connection::parsePerMessageDeflateHeader(const std::string& header) {
1041+
for (auto &extField : seasocks::split(header, ';')) {
1042+
while (!extField.empty() && isspace(extField[0]))
1043+
extField = extField.substr(1);
1044+
1045+
if (seasocks::caseInsensitiveSame(extField, "permessage-deflate")) {
1046+
LS_INFO(_logger, "Enabling per-message deflate");
1047+
_perMessageDeflate = true;
1048+
zlibContext = std::unique_ptr<ZlibContext>(new ZlibContext());
1049+
}
1050+
}
1051+
}
1052+
9881053
bool Connection::parseRange(const std::string& rangeStr, Range& range) const {
9891054
size_t minusPos = rangeStr.find('-');
9901055
if (minusPos == std::string::npos) {

src/main/c/HybiPacketDecoder.cpp

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ HybiPacketDecoder::HybiPacketDecoder(Logger& logger,
3939
}
4040

4141
HybiPacketDecoder::MessageState HybiPacketDecoder::decodeNextMessage(
42-
std::vector<uint8_t>& messageOut) {
42+
std::vector<uint8_t>& messageOut, bool& deflateNeeded) {
4343
if (_messageStart + 1 >= _buffer.size()) {
4444
return MessageState::NoMessage;
4545
}
@@ -49,10 +49,15 @@ HybiPacketDecoder::MessageState HybiPacketDecoder::decodeNextMessage(
4949
LS_WARNING(&_logger, "Received hybi frame without FIN bit set - unsupported");
5050
return MessageState::Error;
5151
}
52-
if ((_buffer[_messageStart] & (7<<4)) != 0) {
52+
53+
auto reservedBits = _buffer[_messageStart] & (7<<4);
54+
if ((reservedBits & 0x30) != 0) {
5355
LS_WARNING(&_logger, "Received hybi frame with reserved bits set - error");
5456
return MessageState::Error;
5557
}
58+
59+
deflateNeeded = !!(reservedBits & 0x40);
60+
5661
auto opcode = static_cast<Opcode>(_buffer[_messageStart] & 0xf);
5762
size_t payloadLength = _buffer[_messageStart + 1] & 0x7fu;
5863
auto maskBit = _buffer[_messageStart + 1] & 0x80;

src/main/c/Server.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,11 @@ void Server::setMaxKeepAliveDrops(int maxKeepAliveDrops) {
592592
_maxKeepAliveDrops = maxKeepAliveDrops;
593593
}
594594

595+
void Server::setPerMessageDeflateEnabled(bool enabled) {
596+
LS_INFO(_logger, "Setting per-message deflate to " << (enabled ? "enabled" : "disabled"));
597+
_perMessageDeflateEnabled = enabled;
598+
}
599+
595600
void Server::checkThread() const {
596601
auto thisTid = gettid();
597602
if (thisTid != _threadId) {

src/main/c/internal/HybiPacketDecoder.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@ class HybiPacketDecoder {
5959
Pong,
6060
Close
6161
};
62-
MessageState decodeNextMessage(std::vector<uint8_t>& messageOut);
62+
MessageState decodeNextMessage(std::vector<uint8_t>& messageOut, bool& deflateNeeded);
63+
MessageState decodeNextMessage(std::vector<uint8_t>& messageOut) {
64+
bool ignore;
65+
return decodeNextMessage(messageOut, ignore);
66+
}
6367

6468
size_t numBytesDecoded() const;
6569
};

src/main/c/internal/ZlibContext.h

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
#pragma once
2+
3+
#include <zlib.h>
4+
5+
#include <vector>
6+
#include <stdexcept>
7+
8+
9+
namespace seasocks {
10+
11+
class ZlibContext {
12+
public:
13+
ZlibContext(const ZlibContext&) = delete;
14+
ZlibContext& operator=(const ZlibContext&) = delete;
15+
16+
ZlibContext(int deflateBits=15, int inflateBits=15, int memLevel=6) {
17+
int ret;
18+
19+
deflateStream.zalloc = Z_NULL;
20+
deflateStream.zfree = Z_NULL;
21+
deflateStream.opaque = Z_NULL;
22+
23+
ret = ::deflateInit2(
24+
&deflateStream,
25+
Z_DEFAULT_COMPRESSION,
26+
Z_DEFLATED,
27+
deflateBits * -1,
28+
memLevel,
29+
Z_DEFAULT_STRATEGY
30+
);
31+
32+
if (ret != Z_OK) {
33+
throw std::runtime_error("error initialising zlib deflater");
34+
}
35+
36+
inflateStream.zalloc = Z_NULL;
37+
inflateStream.zfree = Z_NULL;
38+
inflateStream.opaque = Z_NULL;
39+
inflateStream.avail_in = 0;
40+
inflateStream.next_in = Z_NULL;
41+
42+
ret = ::inflateInit2(
43+
&inflateStream,
44+
inflateBits * -1
45+
);
46+
47+
if (ret != Z_OK) {
48+
::deflateEnd(&deflateStream);
49+
throw std::runtime_error("error initialising zlib inflater");
50+
}
51+
52+
streamsInitialised = true;
53+
}
54+
55+
~ZlibContext() {
56+
if (!streamsInitialised) return;
57+
::deflateEnd(&deflateStream);
58+
::inflateEnd(&inflateStream);
59+
}
60+
61+
void deflate(const uint8_t* input, size_t inputLen, std::vector<uint8_t>& output) {
62+
deflateStream.next_in = (unsigned char *)input;
63+
deflateStream.avail_in = inputLen;
64+
65+
do {
66+
deflateStream.next_out = buffer;
67+
deflateStream.avail_out = sizeof(buffer);
68+
69+
(void) ::deflate(&deflateStream, Z_SYNC_FLUSH);
70+
71+
output.insert(output.end(), buffer, buffer + sizeof(buffer) - deflateStream.avail_out);
72+
} while (deflateStream.avail_out == 0);
73+
74+
// Remove 4-byte tail end prior to transmission (see RFC 7692, section 7.2.1)
75+
output.resize(output.size() - 4);
76+
}
77+
78+
// WARNING: inflate() alters input
79+
bool inflate(std::vector<uint8_t>& input, std::vector<uint8_t>& output, int& zlibError) {
80+
// Append 4 octets prior to decompression (see RFC 7692, section 7.2.2)
81+
uint8_t tail_end[4] = {0x00, 0x00, 0xff, 0xff};
82+
input.insert(input.end(), tail_end, tail_end + 4);
83+
84+
inflateStream.next_in = input.data();
85+
inflateStream.avail_in = input.size();
86+
87+
do {
88+
inflateStream.next_out = buffer;
89+
inflateStream.avail_out = sizeof(buffer);
90+
91+
int ret = ::inflate(&inflateStream, Z_SYNC_FLUSH);
92+
93+
if (ret != Z_OK && ret != Z_STREAM_END) {
94+
zlibError = ret;
95+
return false;
96+
}
97+
98+
output.insert(output.end(), buffer, buffer + sizeof(buffer) - inflateStream.avail_out);
99+
} while (inflateStream.avail_out == 0);
100+
101+
return true;
102+
}
103+
104+
private:
105+
z_stream deflateStream;
106+
z_stream inflateStream;
107+
bool streamsInitialised = false;
108+
uint8_t buffer[16384];
109+
};
110+
111+
} // namespace seasocks

src/main/c/seasocks/Connection.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
#include "seasocks/ResponseWriter.h"
3131
#include "seasocks/TransferEncoding.h"
3232

33+
#include "internal/ZlibContext.h"
34+
3335
#include <netinet/in.h>
3436

3537
#include <sys/socket.h>
@@ -126,6 +128,8 @@ class Connection : public WebSocket {
126128

127129
void sendHybi(uint8_t opcode, const uint8_t* webSocketResponse,
128130
size_t messageLength);
131+
void sendHybiData(const uint8_t* webSocketResponse, size_t messageLength);
132+
129133

130134
bool sendResponse(std::shared_ptr<Response> response);
131135

@@ -177,6 +181,10 @@ class Connection : public WebSocket {
177181
unsigned _chunk;
178182
std::shared_ptr<Writer> _writer;
179183

184+
void parsePerMessageDeflateHeader(const std::string& header);
185+
bool _perMessageDeflate = false;
186+
std::unique_ptr<ZlibContext> zlibContext;
187+
180188
enum class State {
181189
INVALID,
182190
READING_HEADERS,

src/main/c/seasocks/Server.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,9 @@ class Server : private ServerImpl {
119119
void setClientBufferSize(size_t bytesToBuffer);
120120
size_t clientBufferSize() const override { return _clientBufferSize; }
121121

122+
void setPerMessageDeflateEnabled(bool enabled);
123+
bool getPerMessageDeflateEnabled() { return _perMessageDeflateEnabled;}
124+
122125
class Runnable {
123126
public:
124127
virtual ~Runnable() {}
@@ -166,6 +169,9 @@ class Server : private ServerImpl {
166169
size_t _clientBufferSize;
167170
time_t _nextDeadConnectionCheck;
168171

172+
// Compression settings
173+
bool _perMessageDeflateEnabled = false;
174+
169175
struct WebSocketHandlerEntry {
170176
std::shared_ptr<WebSocket::Handler> handler;
171177
bool allowCrossOrigin;

0 commit comments

Comments
 (0)