@@ -89,7 +89,6 @@ DispatchQueue GetCurrentOrSerialQueue() noexcept {
8989
9090 return queue;
9191}
92-
9392} // namespace
9493
9594namespace Microsoft ::React::Networking {
@@ -104,7 +103,6 @@ WinRTWebSocketResource2::WinRTWebSocketResource2(
104103 : m_socket{std::move (socket)},
105104 m_writer (std::move(writer)),
106105 m_readyState{ReadyState::Connecting},
107- m_connectPerformed{CreateEvent (/* attributes*/ nullptr , /* manual reset*/ true , /* state*/ false , /* name*/ nullptr )},
108106 m_callingQueue{callingQueue} {
109107 for (const auto &certException : certExceptions) {
110108 m_socket.Control ().IgnorableServerCertificateErrors ().Append (certException);
@@ -221,125 +219,112 @@ void WinRTWebSocketResource2::OnClosed(IWebSocket const &sender, IWebSocketClose
221219
222220fire_and_forget WinRTWebSocketResource2::PerformConnect (Uri &&uri) noexcept {
223221 auto self = shared_from_this ();
224- auto coUri = std::move (uri);
222+ auto movedUri = std::move (uri);
225223
226224 co_await resume_in_queue (self->m_backgroundQueue );
227225
228- auto async = self->m_socket .ConnectAsync (coUri);
229- co_await lessthrow_await_adapter<IAsyncAction>{async};
230-
231- co_await resume_in_queue (self->m_callingQueue );
232-
233- auto result = async.ErrorCode ();
234-
235- try {
236- if (result >= 0 ) { // Non-failing HRESULT
237- co_await resume_in_queue (self->m_backgroundQueue );
238- self->m_readyState = ReadyState::Open;
239-
240- co_await resume_in_queue (self->m_callingQueue );
241- if (self->m_connectHandler ) {
242- self->m_connectHandler ();
243- }
244- } else {
245- self->Fail (std::move (result), ErrorType::Connection);
246- }
247- } catch (hresult_error const &e) {
248- self->Fail (e, ErrorType::Connection);
249- } catch (std::exception const &e) {
250- self->Fail (e.what (), ErrorType::Connection);
251- }
252-
253- SetEvent (self->m_connectPerformed .get ());
226+ co_await self->m_sequencer .QueueTaskAsync (
227+ [self = self->shared_from_this (), coUri = std::move (movedUri)]() -> IAsyncAction {
228+ auto coSelf = self->shared_from_this ();
229+
230+ auto async = coSelf->m_socket .ConnectAsync (coUri);
231+ co_await lessthrow_await_adapter<IAsyncAction>{async};
232+
233+ auto result = async.ErrorCode ();
234+ try {
235+ if (result >= 0 ) { // Non-failing HRESULT
236+ coSelf->m_readyState = ReadyState::Open;
237+
238+ co_await resume_in_queue (coSelf->m_callingQueue );
239+ if (coSelf->m_connectHandler ) {
240+ coSelf->m_connectHandler ();
241+ }
242+ } else {
243+ coSelf->Fail (std::move (result), ErrorType::Connection);
244+ }
245+ } catch (hresult_error const &e) {
246+ coSelf->Fail (e, ErrorType::Connection);
247+ } catch (std::exception const &e) {
248+ coSelf->Fail (e.what (), ErrorType::Connection);
249+ }
250+ });
254251}
255252
256253fire_and_forget WinRTWebSocketResource2::PerformClose () noexcept {
257254 auto self = shared_from_this ();
258255
259- co_await resume_on_signal (self->m_connectPerformed .get ());
260-
261256 co_await resume_in_queue (self->m_backgroundQueue );
262257
263- // See https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close
264- co_await self->SendPendingMessages ();
258+ co_await self-> m_sequencer . QueueTaskAsync ([self = self-> shared_from_this ()]() -> IAsyncAction {
259+ auto coSelf = self->shared_from_this ();
265260
266- try {
267- self->m_socket .Close (static_cast <uint16_t >(m_closeCode), winrt::to_hstring (m_closeReason));
268- self->m_readyState = ReadyState::Closing;
269- } catch (winrt::hresult_invalid_argument const &e) {
270- Fail (e, ErrorType::Close);
271- } catch (hresult_error const &e) {
272- Fail (e, ErrorType::Close);
273- } catch (const std::exception &e) {
274- Fail (e.what (), ErrorType::Close);
275- }
261+ try {
262+ coSelf->m_socket .Close (static_cast <uint16_t >(coSelf->m_closeCode ), winrt::to_hstring (coSelf->m_closeReason ));
263+ coSelf->m_readyState = ReadyState::Closing;
264+ } catch (winrt::hresult_invalid_argument const &e) {
265+ coSelf->Fail (e, ErrorType::Close);
266+ } catch (hresult_error const &e) {
267+ coSelf->Fail (e, ErrorType::Close);
268+ } catch (const std::exception &e) {
269+ coSelf->Fail (e.what (), ErrorType::Close);
270+ }
271+
272+ co_return ;
273+ });
276274}
277275
278- fire_and_forget WinRTWebSocketResource2::PerformWrite (string &&message, bool isBinary) noexcept {
276+ fire_and_forget WinRTWebSocketResource2::EnqueueWrite (string &&message, bool isBinary) noexcept {
279277 auto self = shared_from_this ();
280278 string coMessage = std::move (message);
281279
282- co_await resume_in_queue (self->m_backgroundQueue ); // Ensure writes happen sequentially
283- self->m_outgoingMessages .emplace (std::move (coMessage), isBinary);
284-
285- co_await resume_on_signal (self->m_connectPerformed .get ());
286-
287280 co_await resume_in_queue (self->m_backgroundQueue );
288281
289- co_await self->SendPendingMessages ();
290- }
282+ co_await self->m_sequencer .QueueTaskAsync (
283+ [self = self->shared_from_this (), message = std::move (coMessage), isBinary]() -> IAsyncAction {
284+ auto coSelf = self->shared_from_this ();
285+ auto coMessage = std::move (message);
291286
292- IAsyncAction WinRTWebSocketResource2::SendPendingMessages () noexcept {
293- // Enforcing execution in the background queue.
294- // Awaiting of this coroutine will schedule its execution in the thread pool, ignoring the intended dispatch queue.
295- co_await resume_in_queue (m_backgroundQueue);
287+ co_await coSelf->PerformWrite (std::move (coMessage), isBinary);
288+ });
289+ }
296290
291+ IAsyncAction WinRTWebSocketResource2::PerformWrite (string &&message, bool isBinary) noexcept {
297292 auto self = shared_from_this ();
298293
299- while (!self->m_outgoingMessages .empty ()) {
300- if (self->m_readyState != ReadyState::Open) {
301- co_return ;
302- }
303-
304- size_t length = 0 ;
305- string messageLocal;
306- bool isBinaryLocal;
307- try {
308- std::tie (messageLocal, isBinaryLocal) = self->m_outgoingMessages .front ();
309- self->m_outgoingMessages .pop ();
310- if (isBinaryLocal) {
311- self->m_socket .Control ().MessageType (SocketMessageType::Binary);
312-
313- auto buffer = CryptographicBuffer::DecodeFromBase64String (winrt::to_hstring (messageLocal));
314- if (buffer) {
315- length = buffer.Length ();
316- self->m_writer .WriteBuffer (buffer);
317- }
318- } else {
319- self->m_socket .Control ().MessageType (SocketMessageType::Utf8);
294+ try {
295+ if (isBinary) {
296+ self->m_socket .Control ().MessageType (SocketMessageType::Binary);
320297
321- length = messageLocal.size ();
322- winrt::array_view<const uint8_t > view (
323- CheckedReinterpretCast<const uint8_t *>(messageLocal.c_str ()),
324- CheckedReinterpretCast<const uint8_t *>(messageLocal.c_str ()) + messageLocal.length ());
325- self->m_writer .WriteBytes (view);
298+ auto buffer = CryptographicBuffer::DecodeFromBase64String (winrt::to_hstring (message));
299+ if (buffer) {
300+ self->m_writer .WriteBuffer (buffer);
326301 }
327- } catch (hresult_error const &e) { // TODO: Remove after fixing unit tests exceptions.
328- self->Fail (e, ErrorType::Send);
329- co_return ;
330- } catch (const std::exception &e) {
331- self->Fail (e.what (), ErrorType::Send);
332- co_return ;
302+ } else {
303+ self->m_socket .Control ().MessageType (SocketMessageType::Utf8);
304+
305+ winrt::array_view<const uint8_t > view (
306+ CheckedReinterpretCast<const uint8_t *>(message.c_str ()),
307+ CheckedReinterpretCast<const uint8_t *>(message.c_str ()) + message.length ());
308+ self->m_writer .WriteBytes (view);
333309 }
310+ } catch (hresult_error const &e) { // TODO: Remove after fixing unit tests exceptions.
311+ self->Fail (e, ErrorType::Send);
312+ } catch (const std::exception &e) {
313+ self->Fail (e.what (), ErrorType::Send);
314+ }
334315
335- auto async = self->m_writer .StoreAsync ();
336- co_await lessthrow_await_adapter<DataWriterStoreOperation>{async};
316+ co_await resume_in_queue (self->m_backgroundQueue );
317+ // If an exception occurred, abort write process.
318+ if (self->m_readyState != ReadyState::Open) {
319+ co_return ;
320+ }
337321
338- auto result = async.ErrorCode ();
339- if (result < 0 ) {
340- Fail (std::move (result), ErrorType::Send);
341- co_return ;
342- }
322+ auto async = self->m_writer .StoreAsync ();
323+ co_await lessthrow_await_adapter<DataWriterStoreOperation>{async};
324+
325+ auto result = async.ErrorCode ();
326+ if (result < 0 ) {
327+ self->Fail (std::move (result), ErrorType::Send);
343328 }
344329}
345330
@@ -393,11 +378,7 @@ void WinRTWebSocketResource2::Connect(string &&url, const Protocols &protocols,
393378 m_socket.SetRequestHeader (L" Origin" , std::move (origin));
394379 }
395380 } catch (hresult_error const &e) {
396- Fail (e, ErrorType::Connection);
397-
398- SetEvent (m_connectPerformed.get ());
399-
400- return ;
381+ return Fail (e, ErrorType::Connection);
401382 }
402383
403384 PerformConnect (std::move (uri));
@@ -406,11 +387,11 @@ void WinRTWebSocketResource2::Connect(string &&url, const Protocols &protocols,
406387void WinRTWebSocketResource2::Ping () noexcept {}
407388
408389void WinRTWebSocketResource2::Send (string &&message) noexcept {
409- PerformWrite (std::move (message), false );
390+ EnqueueWrite (std::move (message), false );
410391}
411392
412393void WinRTWebSocketResource2::SendBinary (string &&base64String) noexcept {
413- PerformWrite (std::move (base64String), true );
394+ EnqueueWrite (std::move (base64String), true );
414395}
415396
416397void WinRTWebSocketResource2::Close (CloseCode code, const string &reason) noexcept {
0 commit comments