Skip to content

Commit

Permalink
cherry pick PR project-chip#30491: IM: Create ReadHandler after Sessi…
Browse files Browse the repository at this point in the history
…on Establishment for Subscription Resumption
  • Loading branch information
Jerry-ESP committed Jan 5, 2024
1 parent 24271f5 commit 5832bc1
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 88 deletions.
7 changes: 7 additions & 0 deletions src/app/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ static_library("app") {
]
}

if (chip_persist_subscriptions) {
sources += [
"SubscriptionResumptionSessionEstablisher.cpp",
"SubscriptionResumptionSessionEstablisher.h",
]
}

public_deps = [
":app_config",
"${chip_root}/src/access",
Expand Down
35 changes: 20 additions & 15 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ extern bool emberAfContainsAttribute(chip::EndpointId endpoint, chip::ClusterId
namespace chip {
namespace app {

class AutoReleaseSubscriptionInfoIterator
{
public:
AutoReleaseSubscriptionInfoIterator(SubscriptionResumptionStorage::SubscriptionInfoIterator * iterator) : mIterator(iterator){};
~AutoReleaseSubscriptionInfoIterator() { mIterator->Release(); }

SubscriptionResumptionStorage::SubscriptionInfoIterator * operator->() const { return mIterator; }

private:
SubscriptionResumptionStorage::SubscriptionInfoIterator * mIterator;
};

using Protocols::InteractionModel::Status;

InteractionModelEngine sInteractionModelEngine;
Expand Down Expand Up @@ -1649,7 +1661,7 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
VerifyOrReturn(apAppState != nullptr);
InteractionModelEngine * imEngine = static_cast<InteractionModelEngine *>(apAppState);
SubscriptionResumptionStorage::SubscriptionInfo subscriptionInfo;
auto * iterator = imEngine->mpSubscriptionResumptionStorage->IterateSubscriptions();
AutoReleaseSubscriptionInfoIterator iterator(imEngine->mpSubscriptionResumptionStorage->IterateSubscriptions());
while (iterator->Next(subscriptionInfo))
{
// If subscription happens between reboot and this timer callback, it's already live and should skip resumption
Expand All @@ -1667,28 +1679,21 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
continue;
}

auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedSize();
auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedSize();
if (!imEngine->EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount,
requestedEventPathCount))
auto subscriptionResumptionSessionEstablisher = Platform::MakeUnique<SubscriptionResumptionSessionEstablisher>();
if (subscriptionResumptionSessionEstablisher == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for Subscription resumption");
iterator->Release();
ChipLogProgress(InteractionModel, "Failed to create SubscriptionResumptionSessionEstablisher");
return;
}

ReadHandler * handler = imEngine->mReadHandlers.CreateObject(*imEngine);
if (handler == nullptr)
if (subscriptionResumptionSessionEstablisher->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo) !=
CHIP_NO_ERROR)
{
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
iterator->Release();
ChipLogProgress(InteractionModel, "Failed to ResumeSubscription 0x%" PRIx32, subscriptionInfo.mSubscriptionId);
return;
}

ChipLogProgress(InteractionModel, "Resuming subscriptionId %" PRIu32, subscriptionInfo.mSubscriptionId);
handler->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo);
subscriptionResumptionSessionEstablisher.release();
}
iterator->Release();
#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
}

Expand Down
2 changes: 2 additions & 0 deletions src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <access/AccessControl.h>
#include <app/MessageDef/AttributeReportIBs.h>
#include <app/MessageDef/ReportDataMessage.h>
#include <app/SubscriptionResumptionSessionEstablisher.h>
#include <lib/core/CHIPCore.h>
#include <lib/support/CodeUtils.h>
#include <lib/support/DLLUtil.h>
Expand Down Expand Up @@ -365,6 +366,7 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
private:
friend class reporting::Engine;
friend class TestCommandInteraction;
friend class SubscriptionResumptionSessionEstablisher;
using Status = Protocols::InteractionModel::Status;

void OnDone(CommandHandler & apCommandObj) override;
Expand Down
73 changes: 23 additions & 50 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
InteractionType aInteractionType) :
mExchangeCtx(*this),
mManagementCallback(apCallback)
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
,
mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
#endif
{
VerifyOrDie(apExchangeContext != nullptr);

Expand All @@ -67,48 +63,53 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
ReadHandler::ReadHandler(ManagementCallback & apCallback) :
mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
mExchangeCtx(*this), mManagementCallback(apCallback)
{
mInteractionType = InteractionType::Subscribe;
mFlags.ClearAll();
}

void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
void ReadHandler::OnSubscriptionResumed(const SessionHandle & sessionHandle,
SubscriptionResumptionSessionEstablisher & resumptionSessionEstablisher)
{
mSubscriptionId = subscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval;
mMaxInterval = subscriptionInfo.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered);
mSubscriptionId = resumptionSessionEstablisher.mSubscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = resumptionSessionEstablisher.mSubscriptionInfo.mMinInterval;
mMaxInterval = resumptionSessionEstablisher.mSubscriptionInfo.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, resumptionSessionEstablisher.mSubscriptionInfo.mFabricFiltered);

// Move dynamically allocated attributes and events from the SubscriptionInfo struct into
// the object pool managed by the IM engine
for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++)
for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths.AllocatedSize(); i++)
{
AttributePathParams attributePathParams = subscriptionInfo.mAttributePaths[i].GetParams();
CHIP_ERROR err =
InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attributePathParams);
AttributePathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mAttributePaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, params);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}
for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++)
for (size_t i = 0; i < resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths.AllocatedSize(); i++)
{
EventPathParams eventPathParams = subscriptionInfo.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, eventPathParams);
EventPathParams params = resumptionSessionEstablisher.mSubscriptionInfo.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, params);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}

// Ask IM engine to start CASE session with subscriber
ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex);
caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
mSessionHandle.Grab(sessionHandle);

MoveToState(HandlerState::GeneratingReports);

ObjectList<AttributePathParams> * attributePath = mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
Expand Down Expand Up @@ -866,33 +867,5 @@ void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag)
SetStateFlag(aFlag, false);
}

void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);

_this->mSessionHandle.Grab(sessionHandle);

_this->MoveToState(HandlerState::GeneratingReports);

ObjectList<AttributePathParams> * attributePath = _this->mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

void ReadHandler::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);
VerifyOrDie(_this != nullptr);

// TODO: Have a retry mechanism tied to wake interval for IC devices
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
err.Format());
_this->Close();
}

} // namespace app
} // namespace chip
34 changes: 11 additions & 23 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <app/MessageDef/EventPathIBs.h>
#include <app/ObjectList.h>
#include <app/OperationalSessionSetup.h>
#include <app/SubscriptionResumptionSessionEstablisher.h>
#include <app/SubscriptionResumptionStorage.h>
#include <lib/core/CHIPCallback.h>
#include <lib/core/CHIPCore.h>
Expand Down Expand Up @@ -206,6 +207,16 @@ class ReadHandler : public Messaging::ExchangeDelegate
return CHIP_NO_ERROR;
}

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* @brief Initialize a ReadHandler for a resumed subsciption
*
* Used after the SubscriptionResumptionSessionEstablisher establishs the CASE session
*/
void OnSubscriptionResumed(const SessionHandle & sessionHandle, SubscriptionResumptionSessionEstablisher & sessionEstablisher);
#endif

private:
PriorityLevel GetCurrentPriority() const { return mCurrentPriority; }
EventNumber & GetEventMin() { return mEventMin; }
Expand Down Expand Up @@ -258,18 +269,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
*/
void OnInitialRequest(System::PacketBufferHandle && aPayload);

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* @brief Resume a persisted subscription
*
* Used after ReadHandler(ManagementCallback & apCallback). This will start a CASE session
* with the subscriber if one doesn't already exist, and send full priming report when connected.
*/
void ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo);
#endif

/**
* Send ReportData to initiator
*
Expand Down Expand Up @@ -420,11 +419,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
void SetStateFlag(ReadHandlerFlags aFlag, bool aValue = true);
void ClearStateFlag(ReadHandlerFlags aFlag);

// Helpers for continuing the subscription resumption
static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle);
static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error);

AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr);

// The current generation of the reporting engine dirty set the last time we were notified that a path we're interested in was
Expand Down Expand Up @@ -503,12 +497,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
PriorityLevel mCurrentPriority = PriorityLevel::Invalid;
BitFlags<ReadHandlerFlags> mFlags;
InteractionType mInteractionType = InteractionType::Read;

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
// Callbacks to handle server-initiated session success/failure
chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
chip::Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
#endif
};
} // namespace app
} // namespace chip
Loading

0 comments on commit 5832bc1

Please sign in to comment.