Skip to content

Commit 6e47c62

Browse files
committed
(ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235).
1 parent bcae7f3 commit 6e47c62

File tree

8 files changed

+215
-20
lines changed

8 files changed

+215
-20
lines changed

docs/CHANGELOG.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
# Changelog
2+
23
All changes to this project will be documented in this file.
34

5+
## [10.1.5] - 2020-08-02
6+
7+
(ws) Add a new ws sub-command, push_server. This command runs a server which sends many messages in a loop to a websocket client. We can receive above 200,000 messages per second (cf #235).
8+
49
## [10.1.4] - 2020-08-02
510

6-
(ws) Add a new ws sub-command, echo_client. This command send a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235)
11+
(ws) Add a new ws sub-command, echo_client. This command sends a message to an echo server, and send back to a server whatever message it does receive. When connecting to a local ws echo_server, on my MacBook Pro 2015 I can send/receive around 30,000 messages per second. (cf #235)
712

813
## [10.1.3] - 2020-08-02
914

docs/performance.md

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
2+
## WebSocket Client performance
3+
4+
We will run a client and a server on the same machine, connecting to localhost. This bench is run on a MacBook Pro from 2015. We can receive over 200,000 (small) messages per second, another way to put it is that it takes 5 micro-second to receive and process one message. This is an indication about the minimal latency to receive messages.
5+
6+
### Receiving messages
7+
8+
By using the push_server ws sub-command, the server will send the same message in a loop to any connected client.
9+
10+
```
11+
ws push_server -q --send_msg 'yo'
12+
```
13+
14+
By using the echo_client ws sub-command, with the -m (mute or no_send), we will display statistics on how many messages we can receive per second.
15+
16+
```
17+
$ ws echo_client -m ws://localhost:8008
18+
[2020-08-02 12:31:17.284] [info] ws_echo_client: connected
19+
[2020-08-02 12:31:17.284] [info] Uri: /
20+
[2020-08-02 12:31:17.284] [info] Headers:
21+
[2020-08-02 12:31:17.284] [info] Connection: Upgrade
22+
[2020-08-02 12:31:17.284] [info] Sec-WebSocket-Accept: byy/pMK2d0PtRwExaaiOnXJTQHo=
23+
[2020-08-02 12:31:17.284] [info] Server: ixwebsocket/10.1.4 macos ssl/SecureTransport zlib 1.2.11
24+
[2020-08-02 12:31:17.284] [info] Upgrade: websocket
25+
[2020-08-02 12:31:17.663] [info] messages received: 0 per second 2595307 total
26+
[2020-08-02 12:31:18.668] [info] messages received: 79679 per second 2674986 total
27+
[2020-08-02 12:31:19.668] [info] messages received: 207438 per second 2882424 total
28+
[2020-08-02 12:31:20.673] [info] messages received: 209207 per second 3091631 total
29+
[2020-08-02 12:31:21.676] [info] messages received: 216056 per second 3307687 total
30+
[2020-08-02 12:31:22.680] [info] messages received: 214927 per second 3522614 total
31+
[2020-08-02 12:31:23.684] [info] messages received: 216960 per second 3739574 total
32+
[2020-08-02 12:31:24.688] [info] messages received: 215232 per second 3954806 total
33+
[2020-08-02 12:31:25.691] [info] messages received: 212300 per second 4167106 total
34+
[2020-08-02 12:31:26.694] [info] messages received: 212501 per second 4379607 total
35+
[2020-08-02 12:31:27.699] [info] messages received: 212330 per second 4591937 total
36+
[2020-08-02 12:31:28.702] [info] messages received: 216511 per second 4808448 total
37+
```

ixwebsocket/IXWebSocketVersion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66

77
#pragma once
88

9-
#define IX_WEBSOCKET_VERSION "10.1.4"
9+
#define IX_WEBSOCKET_VERSION "10.1.5"

ws/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ add_executable(ws
5050
ws_http_client.cpp
5151
ws_ping_pong.cpp
5252
ws_broadcast_server.cpp
53+
ws_push_server.cpp
5354
ws_echo_server.cpp
5455
ws_echo_client.cpp
5556
ws_chat.cpp

ws/ws.cpp

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ int main(int argc, char** argv)
148148
bool version = false;
149149
bool verifyNone = false;
150150
bool disablePong = false;
151+
bool noSend = false;
151152
int port = 8008;
152153
int redisPort = 6379;
153154
int statsdPort = 8125;
@@ -254,6 +255,7 @@ int main(int argc, char** argv)
254255
"--ping_interval", pingIntervalSecs, "Interval between sending pings");
255256
echoClientApp->add_option("--subprotocol", subprotocol, "Subprotocol");
256257
echoClientApp->add_option("--send_msg", sendMsg, "Send message");
258+
echoClientApp->add_flag("-m", noSend, "Do not send messages, only receive messages");
257259
addTLSOptions(echoClientApp);
258260

259261
CLI::App* chatApp = app.add_subcommand("chat", "Group chat");
@@ -272,6 +274,18 @@ int main(int argc, char** argv)
272274
echoServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
273275
addTLSOptions(echoServerApp);
274276

277+
CLI::App* pushServerApp = app.add_subcommand("push_server", "Push server");
278+
pushServerApp->fallthrough();
279+
pushServerApp->add_option("--port", port, "Port");
280+
pushServerApp->add_option("--host", hostname, "Hostname");
281+
pushServerApp->add_flag("-q", quiet, "Quiet / only display warnings and errors");
282+
pushServerApp->add_flag("-g", greetings, "Greet");
283+
pushServerApp->add_flag("-6", ipv6, "IpV6");
284+
pushServerApp->add_flag("-x", disablePerMessageDeflate, "Disable per message deflate");
285+
pushServerApp->add_flag("-p", disablePong, "Disable sending PONG in response to PING");
286+
pushServerApp->add_option("--send_msg", sendMsg, "Send message");
287+
addTLSOptions(pushServerApp);
288+
275289
CLI::App* broadcastServerApp = app.add_subcommand("broadcast_server", "Broadcasting server");
276290
broadcastServerApp->fallthrough();
277291
broadcastServerApp->add_option("--port", port, "Port");
@@ -525,13 +539,25 @@ int main(int argc, char** argv)
525539
tlsOptions,
526540
subprotocol,
527541
pingIntervalSecs,
528-
sendMsg);
542+
sendMsg,
543+
noSend);
529544
}
530545
else if (app.got_subcommand("echo_server"))
531546
{
532547
ret = ix::ws_echo_server_main(
533548
port, greetings, hostname, tlsOptions, ipv6, disablePerMessageDeflate, disablePong);
534549
}
550+
else if (app.got_subcommand("push_server"))
551+
{
552+
ret = ix::ws_push_server(port,
553+
greetings,
554+
hostname,
555+
tlsOptions,
556+
ipv6,
557+
disablePerMessageDeflate,
558+
disablePong,
559+
sendMsg);
560+
}
535561
else if (app.got_subcommand("transfer"))
536562
{
537563
ret = ix::ws_transfer_main(port, hostname, tlsOptions);

ws/ws.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,15 @@ namespace ix
3535
bool disablePerMessageDeflate,
3636
bool disablePong);
3737

38+
int ws_push_server(int port,
39+
bool greetings,
40+
const std::string& hostname,
41+
const ix::SocketTLSOptions& tlsOptions,
42+
bool ipv6,
43+
bool disablePerMessageDeflate,
44+
bool disablePong,
45+
const std::string& sendMsg);
46+
3847
int ws_broadcast_server_main(int port,
3948
const std::string& hostname,
4049
const ix::SocketTLSOptions& tlsOptions);
@@ -60,7 +69,8 @@ namespace ix
6069
const ix::SocketTLSOptions& tlsOptions,
6170
const std::string& subprotocol,
6271
int pingIntervalSecs,
63-
const std::string& sendMsg);
72+
const std::string& sendMsg,
73+
bool noSend);
6474

6575
int ws_receive_main(const std::string& url,
6676
bool enablePerMessageDeflate,

ws/ws_echo_client.cpp

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ namespace ix
2121
const ix::SocketTLSOptions& tlsOptions,
2222
const std::string& subprotocol,
2323
int pingIntervalSecs,
24-
const std::string& sendMsg)
24+
const std::string& sendMsg,
25+
bool noSend)
2526
{
2627
// Our websocket object
2728
ix::WebSocket webSocket;
@@ -46,26 +47,33 @@ namespace ix
4647

4748
// Setup a callback to be fired (in a background thread, watch out for race conditions !)
4849
// when a message or an event (open, close, error) is received
49-
webSocket.setOnMessageCallback(
50-
[&webSocket, &receivedCount, &sendMsg, binaryMode](const ix::WebSocketMessagePtr& msg) {
51-
if (msg->type == ix::WebSocketMessageType::Message)
50+
webSocket.setOnMessageCallback([&webSocket, &receivedCount, &sendMsg, noSend, binaryMode](
51+
const ix::WebSocketMessagePtr& msg) {
52+
if (msg->type == ix::WebSocketMessageType::Message)
53+
{
54+
if (!noSend)
5255
{
5356
webSocket.send(msg->str, msg->binary);
54-
receivedCount++;
5557
}
56-
else if (msg->type == ix::WebSocketMessageType::Open)
58+
receivedCount++;
59+
}
60+
else if (msg->type == ix::WebSocketMessageType::Open)
61+
{
62+
spdlog::info("ws_echo_client: connected");
63+
spdlog::info("Uri: {}", msg->openInfo.uri);
64+
spdlog::info("Headers:");
65+
for (auto it : msg->openInfo.headers)
5766
{
58-
spdlog::info("ws_echo_client: connected");
59-
spdlog::info("Uri: {}", msg->openInfo.uri);
60-
spdlog::info("Headers:");
61-
for (auto it : msg->openInfo.headers)
62-
{
63-
spdlog::info("{}: {}", it.first, it.second);
64-
}
65-
66-
webSocket.send(sendMsg, binaryMode);
67+
spdlog::info("{}: {}", it.first, it.second);
6768
}
68-
});
69+
70+
webSocket.send(sendMsg, binaryMode);
71+
}
72+
else if (msg->type == ix::WebSocketMessageType::Pong)
73+
{
74+
spdlog::info("Received pong {}", msg->str);
75+
}
76+
});
6977

7078
auto timer = [&receivedCount, &receivedCountTotal, &receivedCountPerSecs] {
7179
setThreadName("Timer");

ws/ws_push_server.cpp

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* ws_push_server.cpp
3+
* Author: Benjamin Sergeant
4+
* Copyright (c) 2020 Machine Zone, Inc. All rights reserved.
5+
*/
6+
7+
#include <ixwebsocket/IXNetSystem.h>
8+
#include <ixwebsocket/IXWebSocketServer.h>
9+
#include <spdlog/spdlog.h>
10+
#include <sstream>
11+
12+
namespace ix
13+
{
14+
int ws_push_server(int port,
15+
bool greetings,
16+
const std::string& hostname,
17+
const ix::SocketTLSOptions& tlsOptions,
18+
bool ipv6,
19+
bool disablePerMessageDeflate,
20+
bool disablePong,
21+
const std::string& sendMsg)
22+
{
23+
spdlog::info("Listening on {}:{}", hostname, port);
24+
25+
ix::WebSocketServer server(port,
26+
hostname,
27+
SocketServer::kDefaultTcpBacklog,
28+
SocketServer::kDefaultMaxConnections,
29+
WebSocketServer::kDefaultHandShakeTimeoutSecs,
30+
(ipv6) ? AF_INET6 : AF_INET);
31+
32+
server.setTLSOptions(tlsOptions);
33+
34+
if (disablePerMessageDeflate)
35+
{
36+
spdlog::info("Disable per message deflate");
37+
server.disablePerMessageDeflate();
38+
}
39+
40+
if (disablePong)
41+
{
42+
spdlog::info("Disable responding to PING messages with PONG");
43+
server.disablePong();
44+
}
45+
46+
server.setOnClientMessageCallback(
47+
[greetings, &sendMsg](std::shared_ptr<ConnectionState> connectionState,
48+
ConnectionInfo& connectionInfo,
49+
WebSocket& webSocket,
50+
const WebSocketMessagePtr& msg) {
51+
auto remoteIp = connectionInfo.remoteIp;
52+
if (msg->type == ix::WebSocketMessageType::Open)
53+
{
54+
spdlog::info("New connection");
55+
spdlog::info("remote ip: {}", remoteIp);
56+
spdlog::info("id: {}", connectionState->getId());
57+
spdlog::info("Uri: {}", msg->openInfo.uri);
58+
spdlog::info("Headers:");
59+
for (auto it : msg->openInfo.headers)
60+
{
61+
spdlog::info("{}: {}", it.first, it.second);
62+
}
63+
64+
if (greetings)
65+
{
66+
webSocket.sendText("Welcome !");
67+
}
68+
69+
bool binary = false;
70+
while (true)
71+
{
72+
webSocket.send(sendMsg, binary);
73+
}
74+
}
75+
else if (msg->type == ix::WebSocketMessageType::Close)
76+
{
77+
spdlog::info("Closed connection: client id {} code {} reason {}",
78+
connectionState->getId(),
79+
msg->closeInfo.code,
80+
msg->closeInfo.reason);
81+
}
82+
else if (msg->type == ix::WebSocketMessageType::Error)
83+
{
84+
spdlog::error("Connection error: {}", msg->errorInfo.reason);
85+
spdlog::error("#retries: {}", msg->errorInfo.retries);
86+
spdlog::error("Wait time(ms): {}", msg->errorInfo.wait_time);
87+
spdlog::error("HTTP Status: {}", msg->errorInfo.http_status);
88+
}
89+
else if (msg->type == ix::WebSocketMessageType::Message)
90+
{
91+
spdlog::info("Received {} bytes", msg->wireSize);
92+
webSocket.send(msg->str, msg->binary);
93+
}
94+
});
95+
96+
auto res = server.listen();
97+
if (!res.first)
98+
{
99+
spdlog::error(res.second);
100+
return 1;
101+
}
102+
103+
server.start();
104+
server.wait();
105+
106+
return 0;
107+
}
108+
} // namespace ix

0 commit comments

Comments
 (0)