Skip to content

Commit 42b7a93

Browse files
committed
Share thread-safe outgoing queue implementation between wspp and winrt.
1 parent 8b878f7 commit 42b7a93

File tree

6 files changed

+90
-71
lines changed

6 files changed

+90
-71
lines changed

Release/src/CMakeLists.txt

+2
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,14 @@ elseif(CPPREST_WEBSOCKETS_IMPL STREQUAL "winrt")
4545
websockets/client/ws_msg.cpp
4646
websockets/client/ws_client.cpp
4747
websockets/client/ws_client_winrt.cpp
48+
websockets/client/ws_client_impl.h
4849
)
4950
elseif(CPPREST_WEBSOCKETS_IMPL STREQUAL "wspp")
5051
list(APPEND SOURCES
5152
websockets/client/ws_msg.cpp
5253
websockets/client/ws_client.cpp
5354
websockets/client/ws_client_wspp.cpp
55+
websockets/client/ws_client_impl.h
5456
)
5557
endif()
5658

Release/src/build/common.vcxitems

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
<ClInclude Include="$(MSBuildThisFileDirectory)..\http\client\http_client_impl.h" />
8787
<ClInclude Include="$(MSBuildThisFileDirectory)..\http\listener\http_server_impl.h" />
8888
<ClInclude Include="$(MSBuildThisFileDirectory)..\http\common\internal_http_helpers.h" />
89+
<ClInclude Include="$(MSBuildThisFileDirectory)..\websockets\client\ws_client_impl.h" />
8990
<ClInclude Include="$(MSBuildThisFileDirectory)..\pch\stdafx.h" />
9091
</ItemGroup>
9192
<ItemGroup>

Release/src/build/common.vcxitems.filters

+6-1
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,12 @@
218218
<ClInclude Include="$(MSBuildThisFileDirectory)..\http\listener\http_server_impl.h">
219219
<Filter>Header Files\private</Filter>
220220
</ClInclude>
221-
<ClInclude Include="$(MSBuildThisFileDirectory)..\http\common\internal_http_helpers.h" />
221+
<ClInclude Include="$(MSBuildThisFileDirectory)..\websockets\client\ws_client_impl.h">
222+
<Filter>Header Files\private</Filter>
223+
</ClInclude>
224+
<ClInclude Include="$(MSBuildThisFileDirectory)..\http\common\internal_http_helpers.h">
225+
<Filter>Header Files\private</Filter>
226+
</ClInclude>
222227
</ItemGroup>
223228
<ItemGroup>
224229
<None Include="$(MSBuildThisFileDirectory)..\..\include\cpprest\details\http_constants.dat">
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
#pragma once
2+
3+
#include <queue>
4+
#include <mutex>
5+
#include "cpprest/ws_client.h"
6+
#include "cpprest/ws_msg.h"
7+
8+
namespace web
9+
{
10+
namespace websockets
11+
{
12+
namespace client
13+
{
14+
namespace details
15+
{
16+
17+
struct outgoing_msg_queue
18+
{
19+
enum class state
20+
{
21+
was_empty,
22+
was_not_empty,
23+
};
24+
25+
state push(websocket_outgoing_message& msg)
26+
{
27+
state ret = state::was_not_empty;
28+
std::lock_guard<std::mutex> lock(m_lock);
29+
if (m_queue.empty())
30+
{
31+
ret = state::was_empty;
32+
}
33+
34+
m_queue.push(msg);
35+
return ret;
36+
}
37+
38+
bool pop_and_peek(websocket_outgoing_message& msg)
39+
{
40+
std::lock_guard<std::mutex> lock(m_lock);
41+
42+
m_queue.pop();
43+
44+
if (m_queue.empty())
45+
{
46+
return false;
47+
}
48+
msg = m_queue.front();
49+
return true;
50+
}
51+
52+
private:
53+
std::mutex m_lock;
54+
std::queue<websocket_outgoing_message> m_queue;
55+
};
56+
57+
58+
}}}}

Release/src/websockets/client/ws_client_winrt.cpp

+6-30
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
#if !defined(CPPREST_EXCLUDE_WEBSOCKETS)
1717

18+
#include "ws_client_impl.h"
19+
1820
using namespace ::Windows::Foundation;
1921
using namespace ::Windows::Storage;
2022
using namespace ::Windows::Storage::Streams;
@@ -230,19 +232,10 @@ class winrt_callback_client : public websocket_client_callback_impl, public std:
230232
return pplx::task_from_exception<void>(websocket_exception("Message size too large. Ensure message length is less than UINT_MAX."));
231233
}
232234

233-
bool msg_pending = false;
234-
{
235-
std::lock_guard<std::mutex> lock(m_send_lock);
236-
if (m_outgoing_msg_queue.size() > 0)
237-
{
238-
msg_pending = true;
239-
}
240-
241-
m_outgoing_msg_queue.push(msg);
242-
}
235+
auto msg_pending = m_out_queue.push(msg);
243236

244237
// No sends in progress
245-
if (msg_pending == false)
238+
if (msg_pending == outgoing_msg_queue::state::was_empty)
246239
{
247240
// Start sending the message
248241
send_msg(msg);
@@ -379,22 +372,8 @@ class winrt_callback_client : public websocket_client_callback_impl, public std:
379372
msg.signal_body_sent();
380373
}
381374

382-
bool msg_pending = false;
383375
websocket_outgoing_message next_msg;
384-
{
385-
// Only hold the lock when actually touching the queue.
386-
std::lock_guard<std::mutex> lock(this_client->m_send_lock);
387-
388-
// First message in queue has been sent
389-
this_client->m_outgoing_msg_queue.pop();
390-
391-
if (this_client->m_outgoing_msg_queue.size() > 0)
392-
{
393-
next_msg = this_client->m_outgoing_msg_queue.front();
394-
msg_pending = true;
395-
}
396-
}
397-
376+
bool msg_pending = this_client->m_out_queue.pop_and_peek(next_msg);
398377
if (msg_pending)
399378
{
400379
this_client->send_msg(next_msg);
@@ -443,11 +422,8 @@ class winrt_callback_client : public websocket_client_callback_impl, public std:
443422
std::function<void(websocket_incoming_message)> m_external_message_handler;
444423
std::function<void(websocket_close_status, const utility::string_t&, const std::error_code&)> m_external_close_handler;
445424

446-
// The implementation has to ensure ordering of send requests
447-
std::mutex m_send_lock;
448-
449425
// Queue to track pending sends
450-
std::queue<websocket_outgoing_message> m_outgoing_msg_queue;
426+
outgoing_msg_queue m_out_queue;
451427
};
452428

453429
void ReceiveContext::OnReceive(MessageWebSocket^ sender, MessageWebSocketMessageReceivedEventArgs^ args)

Release/src/websockets/client/ws_client_wspp.cpp

+17-40
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#include "cpprest/details/x509_cert_utilities.h"
1919
#include "pplx/threadpool.h"
2020

21+
#include "ws_client_impl.h"
22+
2123
// Force websocketpp to use C++ std::error_code instead of Boost.
2224
#define _WEBSOCKETPP_CPP11_SYSTEM_ERROR_
2325
#if defined(_MSC_VER)
@@ -401,10 +403,10 @@ class wspp_callback_client : public websocket_client_callback_impl, public std::
401403
{
402404
case websocket_message_type::text_message:
403405
case websocket_message_type::binary_message:
404-
case websocket_message_type::pong:
406+
case websocket_message_type::pong:
405407
break;
406408
default:
407-
return pplx::task_from_exception<void>(websocket_exception("Invalid message type"));
409+
return pplx::task_from_exception<void>(websocket_exception("Message Type not supported."));
408410
}
409411

410412
const auto length = msg.m_length;
@@ -417,22 +419,13 @@ class wspp_callback_client : public websocket_client_callback_impl, public std::
417419
return pplx::task_from_exception<void>(websocket_exception("Message size too large. Ensure message length is less than UINT_MAX."));
418420
}
419421

420-
bool msg_pending = false;
421-
{
422-
std::lock_guard<std::mutex> lock(m_send_lock);
423-
if (m_outgoing_msg_queue.size() > 0)
424-
{
425-
msg_pending = true;
426-
}
427-
428-
m_outgoing_msg_queue.push(msg);
429-
}
422+
auto msg_pending = m_out_queue.push(msg);
430423

431424
// No sends in progress
432-
if (msg_pending == false)
425+
if (msg_pending == outgoing_msg_queue::state::was_empty)
433426
{
434-
// Start sending the message
435-
send_msg(msg);
427+
// Start sending the message
428+
send_msg(msg);
436429
}
437430

438431
return pplx::create_task(msg.body_sent());
@@ -568,21 +561,8 @@ class wspp_callback_client : public websocket_client_callback_impl, public std::
568561
msg.signal_body_sent();
569562
}
570563

571-
bool msg_pending = false;
572564
websocket_outgoing_message next_msg;
573-
{
574-
// Only hold the lock when actually touching the queue.
575-
std::lock_guard<std::mutex> lock(this_client->m_send_lock);
576-
577-
// First message in queue has been sent
578-
this_client->m_outgoing_msg_queue.pop();
579-
580-
if (this_client->m_outgoing_msg_queue.size() > 0)
581-
{
582-
next_msg = this_client->m_outgoing_msg_queue.front();
583-
msg_pending = true;
584-
}
585-
}
565+
bool msg_pending = this_client->m_out_queue.pop_and_peek(next_msg);
586566

587567
if (msg_pending)
588568
{
@@ -681,19 +661,19 @@ class wspp_callback_client : public websocket_client_callback_impl, public std::
681661
ec);
682662
break;
683663
case websocket_message_type::binary_message:
684-
client.send(
664+
client.send(
685665
this_client->m_con,
686666
sp_allocated.get(),
687667
length,
688668
websocketpp::frame::opcode::binary,
689669
ec);
690670
break;
691-
case websocket_message_type::pong:
692-
client.pong(
693-
this_client->m_con,
694-
"",
695-
ec);
696-
break;
671+
case websocket_message_type::pong:
672+
client.pong(
673+
this_client->m_con,
674+
"",
675+
ec);
676+
break;
697677
default:
698678
// This case should have already been filtered above.
699679
std::abort();
@@ -775,11 +755,8 @@ class wspp_callback_client : public websocket_client_callback_impl, public std::
775755
State m_state;
776756
std::unique_ptr<websocketpp_client_base> m_client;
777757

778-
// Guards access to m_outgoing_msg_queue
779-
std::mutex m_send_lock;
780-
781758
// Queue to track pending sends
782-
std::queue<websocket_outgoing_message> m_outgoing_msg_queue;
759+
outgoing_msg_queue m_out_queue;
783760

784761
// External callback for handling received and close event
785762
std::function<void(websocket_incoming_message)> m_external_message_handler;

0 commit comments

Comments
 (0)