Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IM: Create ReadHandler after Session Establishment for Subscription Resumption #30491

Merged
merged 8 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/app/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ static_library("app") {
]
}

if (chip_persist_subscriptions) {
sources += [
"SubscriptionResumptionSessionEstablisher.cpp",
"SubscriptionResumptionSessionEstablisher.h",
]
}

if (chip_enable_read_client) {
sources += [
"BufferedReadCallback.cpp",
Expand Down
18 changes: 7 additions & 11 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1853,26 +1853,22 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
continue;
}

auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedSize();
auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedSize();
if (!imEngine->EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount,
requestedEventPathCount))
auto subscriptionResumptionSessionEstablisher = Platform::New<SubscriptionResumptionSessionEstablisher>();
wqx6 marked this conversation as resolved.
Show resolved Hide resolved
if (subscriptionResumptionSessionEstablisher == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for Subscription resumption");
ChipLogProgress(InteractionModel, "Failed to create SubscriptionResumptionSessionEstablisher");
wqx6 marked this conversation as resolved.
Show resolved Hide resolved
iterator->Release();
return;
}

ReadHandler * handler = imEngine->mReadHandlers.CreateObject(*imEngine, imEngine->GetReportScheduler());
if (handler == nullptr)
if (subscriptionResumptionSessionEstablisher->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo) !=
CHIP_NO_ERROR)
{
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
ChipLogProgress(InteractionModel, "Failed to ResumeSubscription 0x%" PRIx32, subscriptionInfo.mSubscriptionId);
iterator->Release();
Platform::Delete(subscriptionResumptionSessionEstablisher);
return;
}

ChipLogProgress(InteractionModel, "Resuming subscriptionId %" PRIu32, subscriptionInfo.mSubscriptionId);
handler->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo);
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
resumedSubscriptions = true;
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
Expand Down
2 changes: 2 additions & 0 deletions src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <app/AppConfig.h>
#include <app/MessageDef/AttributeReportIBs.h>
#include <app/MessageDef/ReportDataMessage.h>
#include <app/SubscriptionResumptionSessionEstablisher.h>
#include <lib/core/CHIPCore.h>
#include <lib/support/CodeUtils.h>
#include <lib/support/DLLUtil.h>
Expand Down Expand Up @@ -377,6 +378,7 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
friend class reporting::Engine;
friend class TestCommandInteraction;
friend class TestInteractionModelEngine;
friend class SubscriptionResumptionSessionEstablisher;
using Status = Protocols::InteractionModel::Status;

void OnDone(CommandHandler & apCommandObj) override;
Expand Down
92 changes: 32 additions & 60 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
InteractionType aInteractionType, Observer * observer) :
mExchangeCtx(*this),
mManagementCallback(apCallback)
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
,
mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
#endif
{
VerifyOrDie(apExchangeContext != nullptr);

Expand All @@ -83,8 +79,7 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
mExchangeCtx(*this), mManagementCallback(apCallback)
{
mInteractionType = InteractionType::Subscribe;
mFlags.ClearAll();
Expand All @@ -93,41 +88,56 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
mObserver = observer;
}

void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
void ReadHandler::OnSubscriptionResumed(const SessionHandle & sessionHandle, SubscriptionResumptionSessionEstablisher & helper)
wqx6 marked this conversation as resolved.
Show resolved Hide resolved
{
mSubscriptionId = subscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval;
mMaxInterval = subscriptionInfo.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered);
mSubscriptionId = helper.mSubscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = helper.mSubscriptionInfo.mMinInterval;
mMaxInterval = helper.mSubscriptionInfo.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, helper.mSubscriptionInfo.mFabricFiltered);

// Move dynamically allocated attributes and events from the SubscriptionInfo struct into
// the object pool managed by the IM engine
for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++)
for (size_t i = 0; i < helper.mSubscriptionInfo.mAttributePaths.AllocatedSize(); i++)
{
AttributePathParams attributePathParams = subscriptionInfo.mAttributePaths[i].GetParams();
CHIP_ERROR err =
InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attributePathParams);
AttributePathParams params = helper.mSubscriptionInfo.mAttributePaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, params);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}
for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++)
for (size_t i = 0; i < helper.mSubscriptionInfo.mEventPaths.AllocatedSize(); i++)
{
EventPathParams eventPathParams = subscriptionInfo.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, eventPathParams);
EventPathParams params = helper.mSubscriptionInfo.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, params);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}

// Ask IM engine to start CASE session with subscriber
ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex);
caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
mSessionHandle.Grab(sessionHandle);

SetStateFlag(ReadHandlerFlags::ActiveSubscription);

auto * appCallback = mManagementCallback.GetAppCallback();
if (appCallback)
{
appCallback->OnSubscriptionEstablished(*this);
}
// Notify the observer that a subscription has been resumed
mObserver->OnSubscriptionEstablished(this);

MoveToState(HandlerState::CanStartReporting);

ObjectList<AttributePathParams> * attributePath = mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
Expand Down Expand Up @@ -892,43 +902,5 @@ void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag)
SetStateFlag(aFlag, false);
}

void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);

_this->mSessionHandle.Grab(sessionHandle);

_this->SetStateFlag(ReadHandlerFlags::ActiveSubscription);

auto * appCallback = _this->mManagementCallback.GetAppCallback();
if (appCallback)
{
appCallback->OnSubscriptionEstablished(*_this);
}
// Notify the observer that a subscription has been resumed
_this->mObserver->OnSubscriptionEstablished(_this);

_this->MoveToState(HandlerState::CanStartReporting);

ObjectList<AttributePathParams> * attributePath = _this->mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

void ReadHandler::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);
VerifyOrDie(_this != nullptr);

// TODO: Have a retry mechanism tied to wake interval for IC devices
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
err.Format());
_this->Close();
}

} // namespace app
} // namespace chip
34 changes: 11 additions & 23 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <app/MessageDef/EventPathIBs.h>
#include <app/ObjectList.h>
#include <app/OperationalSessionSetup.h>
#include <app/SubscriptionResumptionSessionEstablisher.h>
#include <app/SubscriptionResumptionStorage.h>
#include <lib/core/CHIPCallback.h>
#include <lib/core/CHIPCore.h>
Expand Down Expand Up @@ -253,6 +254,16 @@ class ReadHandler : public Messaging::ExchangeDelegate
return CHIP_NO_ERROR;
}

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* @brief Initialize a ReadHandler for a resumed subsciption
*
* Used after the SubscriptionResumptionSessionEstablisher establishs the CASE session
*/
void OnSubscriptionResumed(const SessionHandle & sessionHandle, SubscriptionResumptionSessionEstablisher & sessionEstablisher);
#endif

private:
PriorityLevel GetCurrentPriority() const { return mCurrentPriority; }
EventNumber & GetEventMin() { return mEventMin; }
Expand Down Expand Up @@ -302,18 +313,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
*/
void OnInitialRequest(System::PacketBufferHandle && aPayload);

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* @brief Resume a persisted subscription
*
* Used after ReadHandler(ManagementCallback & apCallback). This will start a CASE session
* with the subscriber if one doesn't already exist, and send full priming report when connected.
*/
void ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo);
#endif

/**
* Send ReportData to initiator
*
Expand Down Expand Up @@ -485,11 +484,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
/// @param aFlag Flag to clear
void ClearStateFlag(ReadHandlerFlags aFlag);

// Helpers for continuing the subscription resumption
static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle);
static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error);

AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr);

// The current generation of the reporting engine dirty set the last time we were notified that a path we're interested in was
Expand Down Expand Up @@ -571,12 +565,6 @@ class ReadHandler : public Messaging::ExchangeDelegate

// TODO (#27675): Merge all observers into one and that one will dispatch the callbacks to the right place.
Observer * mObserver = nullptr;

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
// Callbacks to handle server-initiated session success/failure
chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
chip::Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
#endif
};
} // namespace app
} // namespace chip
112 changes: 112 additions & 0 deletions src/app/SubscriptionResumptionSessionEstablisher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
*
* Copyright (c) 2023 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <app/InteractionModelEngine.h>
#include <app/SubscriptionResumptionSessionEstablisher.h>

namespace chip {
namespace app {
SubscriptionResumptionSessionEstablisher::SubscriptionResumptionSessionEstablisher() :
mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
{}

CHIP_ERROR
SubscriptionResumptionSessionEstablisher::ResumeSubscription(
CASESessionManager & caseSessionManager, const SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
{
mSubscriptionInfo.mNodeId = subscriptionInfo.mNodeId;
mSubscriptionInfo.mFabricIndex = subscriptionInfo.mFabricIndex;
mSubscriptionInfo.mSubscriptionId = subscriptionInfo.mSubscriptionId;
mSubscriptionInfo.mMinInterval = subscriptionInfo.mMinInterval;
mSubscriptionInfo.mMaxInterval = subscriptionInfo.mMaxInterval;
mSubscriptionInfo.mFabricFiltered = subscriptionInfo.mFabricFiltered;
// Copy the Attribute Paths and Event Paths
if (subscriptionInfo.mAttributePaths.AllocatedSize() > 0)
{
mSubscriptionInfo.mAttributePaths.Alloc(subscriptionInfo.mAttributePaths.AllocatedSize());
if (!mSubscriptionInfo.mAttributePaths.Get())
{
return CHIP_ERROR_NO_MEMORY;
}
for (size_t i = 0; i < mSubscriptionInfo.mAttributePaths.AllocatedSize(); ++i)
{
mSubscriptionInfo.mAttributePaths[i] = subscriptionInfo.mAttributePaths[i];
}
}
if (subscriptionInfo.mEventPaths.AllocatedSize() > 0)
{
mSubscriptionInfo.mEventPaths.Alloc(subscriptionInfo.mEventPaths.AllocatedSize());
if (!mSubscriptionInfo.mEventPaths.Get())
{
return CHIP_ERROR_NO_MEMORY;
}
for (size_t i = 0; i < mSubscriptionInfo.mEventPaths.AllocatedSize(); ++i)
{
mSubscriptionInfo.mEventPaths[i] = subscriptionInfo.mEventPaths[i];
}
}

ScopedNodeId peerNode = ScopedNodeId(mSubscriptionInfo.mNodeId, mSubscriptionInfo.mFabricIndex);
caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
return CHIP_NO_ERROR;
}

void SubscriptionResumptionSessionEstablisher::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle)
{
SubscriptionResumptionSessionEstablisher * _this = static_cast<SubscriptionResumptionSessionEstablisher *>(context);
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo = _this->mSubscriptionInfo;
InteractionModelEngine * imEngine = InteractionModelEngine::GetInstance();
if (!imEngine->EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, subscriptionInfo.mAttributePaths.AllocatedSize(),
subscriptionInfo.mEventPaths.AllocatedSize()))
{
ChipLogProgress(InteractionModel, "no resource for subscription resumption");
delete _this;
wqx6 marked this conversation as resolved.
Show resolved Hide resolved
return;
}
ReadHandler * readHandler = imEngine->mReadHandlers.CreateObject(*imEngine, imEngine->GetReportScheduler());
if (readHandler == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
delete _this;
return;
}
readHandler->OnSubscriptionResumed(sessionHandle, *_this);
delete _this;
}

void SubscriptionResumptionSessionEstablisher::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId,
CHIP_ERROR error)
{
SubscriptionResumptionSessionEstablisher * _this = static_cast<SubscriptionResumptionSessionEstablisher *>(context);
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo = _this->mSubscriptionInfo;
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
error.Format());
// If the device fails to establish the session, the subscriber might be offline and its subscription read client will
// be deleted when the device reconnect to the subscriber. This subscription will be never used again. So clean up
// the persistent subscription information storage.
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
if (subscriptionResumptionStorage)
{
subscriptionResumptionStorage->Delete(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex,
subscriptionInfo.mSubscriptionId);
}
delete _this;
wqx6 marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace app
} // namespace chip
Loading
Loading