Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IM: Create ReadHandler after Session Establishment for Subscription Resumption #30491

Merged
merged 8 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/app/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ static_library("app") {
]
}

if (chip_persist_subscriptions) {
sources += [
"SubscriptionResumptionSessionEstablisher.cpp",
"SubscriptionResumptionSessionEstablisher.h",
]
}

if (chip_enable_read_client) {
sources += [
"BufferedReadCallback.cpp",
Expand Down
35 changes: 20 additions & 15 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@
namespace chip {
namespace app {

class AutoReleaseSubscriptionInfoIterator
{
public:
AutoReleaseSubscriptionInfoIterator(SubscriptionResumptionStorage::SubscriptionInfoIterator * iterator) : mIterator(iterator){};
~AutoReleaseSubscriptionInfoIterator() { mIterator->Release(); }

SubscriptionResumptionStorage::SubscriptionInfoIterator * operator->() const { return mIterator; }

private:
SubscriptionResumptionStorage::SubscriptionInfoIterator * mIterator;
};

using Protocols::InteractionModel::Status;

Global<InteractionModelEngine> sInteractionModelEngine;
Expand Down Expand Up @@ -1835,7 +1847,7 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
bool resumedSubscriptions = false;
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo;
auto * iterator = imEngine->mpSubscriptionResumptionStorage->IterateSubscriptions();
AutoReleaseSubscriptionInfoIterator iterator(imEngine->mpSubscriptionResumptionStorage->IterateSubscriptions());
while (iterator->Next(subscriptionInfo))
{
// If subscription happens between reboot and this timer callback, it's already live and should skip resumption
Expand All @@ -1853,31 +1865,24 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
continue;
}

auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedSize();
auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedSize();
if (!imEngine->EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount,
requestedEventPathCount))
auto subscriptionResumptionSessionEstablisher = Platform::MakeUnique<SubscriptionResumptionSessionEstablisher>();
if (subscriptionResumptionSessionEstablisher == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for Subscription resumption");
iterator->Release();
ChipLogProgress(InteractionModel, "Failed to create SubscriptionResumptionSessionEstablisher");
wqx6 marked this conversation as resolved.
Show resolved Hide resolved
return;
}

ReadHandler * handler = imEngine->mReadHandlers.CreateObject(*imEngine, imEngine->GetReportScheduler());
if (handler == nullptr)
if (subscriptionResumptionSessionEstablisher->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo) !=
CHIP_NO_ERROR)
{
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
iterator->Release();
ChipLogProgress(InteractionModel, "Failed to ResumeSubscription 0x%" PRIx32, subscriptionInfo.mSubscriptionId);
return;
}

ChipLogProgress(InteractionModel, "Resuming subscriptionId %" PRIu32, subscriptionInfo.mSubscriptionId);
handler->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo);
subscriptionResumptionSessionEstablisher.release();
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
resumedSubscriptions = true;
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
}
iterator->Release();

#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
// If no persisted subscriptions needed resumption then all resumption retries are done
Expand Down
2 changes: 2 additions & 0 deletions src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <app/AppConfig.h>
#include <app/MessageDef/AttributeReportIBs.h>
#include <app/MessageDef/ReportDataMessage.h>
#include <app/SubscriptionResumptionSessionEstablisher.h>
#include <lib/core/CHIPCore.h>
#include <lib/support/CodeUtils.h>
#include <lib/support/DLLUtil.h>
Expand Down Expand Up @@ -377,6 +378,7 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
friend class reporting::Engine;
friend class TestCommandInteraction;
friend class TestInteractionModelEngine;
friend class SubscriptionResumptionSessionEstablisher;
using Status = Protocols::InteractionModel::Status;

void OnDone(CommandHandler & apCommandObj) override;
Expand Down
93 changes: 33 additions & 60 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
InteractionType aInteractionType, Observer * observer) :
mExchangeCtx(*this),
mManagementCallback(apCallback)
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
,
mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
#endif
{
VerifyOrDie(apExchangeContext != nullptr);

Expand All @@ -83,8 +79,7 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
mExchangeCtx(*this), mManagementCallback(apCallback)
{
mInteractionType = InteractionType::Subscribe;
mFlags.ClearAll();
Expand All @@ -93,41 +88,57 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
mObserver = observer;
}

void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
void ReadHandler::OnSubscriptionResumed(const SessionHandle & sessionHandle,
SubscriptionResumptionSessionEstablisher & resumptionSessionEstablisher)
{
mSubscriptionId = subscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval;
mMaxInterval = subscriptionInfo.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered);
mSubscriptionId = resumptionSessionEstablisher.mSubscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = resumptionSessionEstablisher.mSubscriptionInfo.mMinInterval;
mMaxInterval = resumptionSessionEstablisher.mSubscriptionInfo.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, resumptionSessionEstablisher.mSubscriptionInfo.mFabricFiltered);

// Move dynamically allocated attributes and events from the SubscriptionInfo struct into
// the object pool managed by the IM engine
for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++)
for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths.AllocatedSize(); i++)
{
AttributePathParams attributePathParams = subscriptionInfo.mAttributePaths[i].GetParams();
CHIP_ERROR err =
InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attributePathParams);
AttributePathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, params);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}
for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++)
for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths.AllocatedSize(); i++)
{
EventPathParams eventPathParams = subscriptionInfo.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, eventPathParams);
EventPathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, params);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}

// Ask IM engine to start CASE session with subscriber
ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex);
caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
mSessionHandle.Grab(sessionHandle);

SetStateFlag(ReadHandlerFlags::ActiveSubscription);

auto * appCallback = mManagementCallback.GetAppCallback();
if (appCallback)
{
appCallback->OnSubscriptionEstablished(*this);
}
// Notify the observer that a subscription has been resumed
mObserver->OnSubscriptionEstablished(this);

MoveToState(HandlerState::CanStartReporting);

ObjectList<AttributePathParams> * attributePath = mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
Expand Down Expand Up @@ -892,43 +903,5 @@ void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag)
SetStateFlag(aFlag, false);
}

void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);

_this->mSessionHandle.Grab(sessionHandle);

_this->SetStateFlag(ReadHandlerFlags::ActiveSubscription);

auto * appCallback = _this->mManagementCallback.GetAppCallback();
if (appCallback)
{
appCallback->OnSubscriptionEstablished(*_this);
}
// Notify the observer that a subscription has been resumed
_this->mObserver->OnSubscriptionEstablished(_this);

_this->MoveToState(HandlerState::CanStartReporting);

ObjectList<AttributePathParams> * attributePath = _this->mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

void ReadHandler::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);
VerifyOrDie(_this != nullptr);

// TODO: Have a retry mechanism tied to wake interval for IC devices
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
err.Format());
_this->Close();
}

} // namespace app
} // namespace chip
34 changes: 11 additions & 23 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <app/MessageDef/EventPathIBs.h>
#include <app/ObjectList.h>
#include <app/OperationalSessionSetup.h>
#include <app/SubscriptionResumptionSessionEstablisher.h>
#include <app/SubscriptionResumptionStorage.h>
#include <lib/core/CHIPCallback.h>
#include <lib/core/CHIPCore.h>
Expand Down Expand Up @@ -253,6 +254,16 @@ class ReadHandler : public Messaging::ExchangeDelegate
return CHIP_NO_ERROR;
}

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* @brief Initialize a ReadHandler for a resumed subsciption
*
* Used after the SubscriptionResumptionSessionEstablisher establishs the CASE session
*/
void OnSubscriptionResumed(const SessionHandle & sessionHandle, SubscriptionResumptionSessionEstablisher & sessionEstablisher);
#endif

private:
PriorityLevel GetCurrentPriority() const { return mCurrentPriority; }
EventNumber & GetEventMin() { return mEventMin; }
Expand Down Expand Up @@ -302,18 +313,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
*/
void OnInitialRequest(System::PacketBufferHandle && aPayload);

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* @brief Resume a persisted subscription
*
* Used after ReadHandler(ManagementCallback & apCallback). This will start a CASE session
* with the subscriber if one doesn't already exist, and send full priming report when connected.
*/
void ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo);
#endif

/**
* Send ReportData to initiator
*
Expand Down Expand Up @@ -485,11 +484,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
/// @param aFlag Flag to clear
void ClearStateFlag(ReadHandlerFlags aFlag);

// Helpers for continuing the subscription resumption
static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle);
static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error);

AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr);

// The current generation of the reporting engine dirty set the last time we were notified that a path we're interested in was
Expand Down Expand Up @@ -571,12 +565,6 @@ class ReadHandler : public Messaging::ExchangeDelegate

// TODO (#27675): Merge all observers into one and that one will dispatch the callbacks to the right place.
Observer * mObserver = nullptr;

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
// Callbacks to handle server-initiated session success/failure
chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
chip::Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
#endif
};
} // namespace app
} // namespace chip
Loading
Loading