From 10005285360a6590a61f09e0232f1c51477bbd20 Mon Sep 17 00:00:00 2001 From: Lukas Zeller Date: Thu, 28 Jul 2022 18:14:28 +0200 Subject: [PATCH] SystemLayerImplSelect: make safe with unbalanced use of Request/ClearCallback (#21349) Original version (3ca630c7) could cause non balanced dispatch_suspend/dispatch_resume which is not allowed by the dispatch API, as pointed out in #pullrequestreview-1051517126. This version does not use dispatch_suspend/resume any more to prevent the problem, additionally making the implementation behaving as similar as possible as, and compatible with, the select() based fallback. The fallback is required by some tests even on dispatch based platforms, when no dispatch queue can be started. But for real applications on dispatch based platforms, there should always be a dispatch queue, so the implementation now issues a warning when RequestCallbackOnPendingXXX is called without a dispatch queue available. --- src/system/SystemLayerImplSelect.cpp | 111 ++++++++++++++++----------- src/system/SystemLayerImplSelect.h | 1 + 2 files changed, 66 insertions(+), 46 deletions(-) diff --git a/src/system/SystemLayerImplSelect.cpp b/src/system/SystemLayerImplSelect.cpp index 5d5e07e890e0f4..98f4fe71547826 100644 --- a/src/system/SystemLayerImplSelect.cpp +++ b/src/system/SystemLayerImplSelect.cpp @@ -77,6 +77,12 @@ void LayerImplSelect::Shutdown() } } mTimerPool.ReleaseAll(); + + for (auto & w : mSocketWatchPool) + { + w.DisableAndClear(); + } + #else // CHIP_SYSTEM_CONFIG_USE_DISPATCH mTimerList.Clear(); mTimerPool.ReleaseAll(); @@ -248,23 +254,32 @@ CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingRead(SocketWatchToken token) watch->mPendingIO.Set(SocketEventFlags::kRead); #if CHIP_SYSTEM_CONFIG_USE_DISPATCH - dispatch_queue_t dispatchQueue = GetDispatchQueue(); - if (watch->mWrSource) + if (watch->mRdSource == nullptr) { - dispatch_resume(watch->mRdSource); - } - else - { - if (dispatchQueue) + // First time requesting callback for read events: install a dispatch source + dispatch_queue_t dispatchQueue = GetDispatchQueue(); + if (dispatchQueue == nullptr) + { + // Note: if no dispatch queue is available, callbacks most probably will not work, + // unless, as in some tests from a test-specific local loop, + // the select based event handling (Prepare/WaitFor/HandleEvents) is invoked. + ChipLogError(DeviceLayer, + "RequestCallbackOnPendingRead with no dispatch queue: callback may not work (might be ok in tests)"); + } + else { watch->mRdSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, static_cast(watch->mFD), 0, dispatchQueue); ReturnErrorCodeIf(watch->mRdSource == nullptr, CHIP_ERROR_NO_MEMORY); dispatch_source_set_event_handler(watch->mRdSource, ^{ - SocketEvents events; - events.Set(SocketEventFlags::kRead); - watch->mCallback(events, watch->mCallbackData); + if (watch->mPendingIO.Has(SocketEventFlags::kRead) && watch->mCallback != nullptr) + { + SocketEvents events; + events.Set(SocketEventFlags::kRead); + watch->mCallback(events, watch->mCallbackData); + } }); + // only now we are sure the source exists and can become active dispatch_activate(watch->mRdSource); } } @@ -281,23 +296,33 @@ CHIP_ERROR LayerImplSelect::RequestCallbackOnPendingWrite(SocketWatchToken token watch->mPendingIO.Set(SocketEventFlags::kWrite); #if CHIP_SYSTEM_CONFIG_USE_DISPATCH - if (watch->mWrSource) - { - dispatch_resume(watch->mWrSource); - } - else + if (watch->mWrSource == nullptr) { + // First time requesting callback for read events: install a dispatch source dispatch_queue_t dispatchQueue = GetDispatchQueue(); - if (dispatchQueue) + if (dispatchQueue == nullptr) + { + // Note: if no dispatch queue is available, callbacks most probably will not work, + // unless, as in some tests from a test-specific local loop, + // the select based event handling (Prepare/WaitFor/HandleEvents) is invoked. + ChipLogError(DeviceLayer, + "RequestCallbackOnPendingWrite with no dispatch queue: callback may not work (might be ok in tests)"); + } + else { watch->mWrSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, static_cast(watch->mFD), 0, dispatchQueue); ReturnErrorCodeIf(watch->mWrSource == nullptr, CHIP_ERROR_NO_MEMORY); dispatch_source_set_event_handler(watch->mWrSource, ^{ - SocketEvents events; - events.Set(SocketEventFlags::kWrite); - watch->mCallback(events, watch->mCallbackData); + if (watch->mPendingIO.Has(SocketEventFlags::kWrite) && watch->mCallback != nullptr) + { + SocketEvents events; + events.Set(SocketEventFlags::kWrite); + watch->mCallback(events, watch->mCallbackData); + } }); + // only now we are sure the source exists and can become active + watch->mPendingIO.Set(SocketEventFlags::kWrite); dispatch_activate(watch->mWrSource); } } @@ -310,14 +335,8 @@ CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingRead(SocketWatchToken token) { SocketWatch * watch = reinterpret_cast(token); VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT); - watch->mPendingIO.Clear(SocketEventFlags::kRead); -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH - if (watch->mRdSource) - { - dispatch_suspend(watch->mRdSource); - } -#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH + watch->mPendingIO.Clear(SocketEventFlags::kRead); return CHIP_NO_ERROR; } @@ -326,14 +345,8 @@ CHIP_ERROR LayerImplSelect::ClearCallbackOnPendingWrite(SocketWatchToken token) { SocketWatch * watch = reinterpret_cast(token); VerifyOrReturnError(watch != nullptr, CHIP_ERROR_INVALID_ARGUMENT); - watch->mPendingIO.Clear(SocketEventFlags::kWrite); -#if CHIP_SYSTEM_CONFIG_USE_DISPATCH - if (watch->mWrSource) - { - dispatch_suspend(watch->mWrSource); - } -#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH + watch->mPendingIO.Clear(SocketEventFlags::kWrite); return CHIP_NO_ERROR; } @@ -347,21 +360,10 @@ CHIP_ERROR LayerImplSelect::StopWatchingSocket(SocketWatchToken * tokenInOut) VerifyOrReturnError(watch->mFD >= 0, CHIP_ERROR_INCORRECT_STATE); #if CHIP_SYSTEM_CONFIG_USE_DISPATCH - if (watch->mRdSource) - { - dispatch_cancel(watch->mRdSource); - dispatch_release(watch->mRdSource); - } - if (watch->mWrSource) - { - dispatch_cancel(watch->mWrSource); - dispatch_release(watch->mWrSource); - } -#endif - + watch->DisableAndClear(); +#else watch->Clear(); -#if !CHIP_SYSTEM_CONFIG_USE_DISPATCH // Wake the thread calling select so that it stops selecting on the socket. Signal(); #endif @@ -511,5 +513,22 @@ void LayerImplSelect::SocketWatch::Clear() #endif } +#if CHIP_SYSTEM_CONFIG_USE_DISPATCH +void LayerImplSelect::SocketWatch::DisableAndClear() +{ + if (mRdSource) + { + dispatch_source_cancel(mRdSource); + dispatch_release(mRdSource); + } + if (mWrSource) + { + dispatch_source_cancel(mWrSource); + dispatch_release(mWrSource); + } + Clear(); +} +#endif // CHIP_SYSTEM_CONFIG_USE_DISPATCH + } // namespace System } // namespace chip diff --git a/src/system/SystemLayerImplSelect.h b/src/system/SystemLayerImplSelect.h index 7f2724ed732d11..f193a8860d961c 100644 --- a/src/system/SystemLayerImplSelect.h +++ b/src/system/SystemLayerImplSelect.h @@ -93,6 +93,7 @@ class LayerImplSelect : public LayerSocketsLoop #if CHIP_SYSTEM_CONFIG_USE_DISPATCH dispatch_source_t mRdSource; dispatch_source_t mWrSource; + void DisableAndClear(); #endif intptr_t mCallbackData; };