Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IM: Create ReadHandler after Session Establishment for Subscription Resumption #30491

Merged
merged 8 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/app/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,13 @@ static_library("app") {
]
}

if (chip_persist_subscriptions) {
sources += [
"SubscriptionResumptionHelper.cpp",
"SubscriptionResumptionHelper.h",
]
}

if (chip_enable_read_client) {
sources += [
"BufferedReadCallback.cpp",
Expand Down
17 changes: 6 additions & 11 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1853,26 +1853,21 @@ void InteractionModelEngine::ResumeSubscriptionsTimerCallback(System::Layer * ap
continue;
}

auto requestedAttributePathCount = subscriptionInfo.mAttributePaths.AllocatedSize();
auto requestedEventPathCount = subscriptionInfo.mEventPaths.AllocatedSize();
if (!imEngine->EnsureResourceForSubscription(subscriptionInfo.mFabricIndex, requestedAttributePathCount,
requestedEventPathCount))
auto subscriptionResumptionHelper = Platform::MakeUnique<SubscriptionResumptionHelper>();
wqx6 marked this conversation as resolved.
Show resolved Hide resolved
if (subscriptionResumptionHelper == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for Subscription resumption");
ChipLogProgress(InteractionModel, "Failed to create SubscriptionResumptionHelper");
iterator->Release();
return;
}

ReadHandler * handler = imEngine->mReadHandlers.CreateObject(*imEngine, imEngine->GetReportScheduler());
if (handler == nullptr)
if (subscriptionResumptionHelper->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo) != CHIP_NO_ERROR)
{
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
ChipLogProgress(InteractionModel, "Failed to ResumeSubscription 0x%" PRIx32, subscriptionInfo.mSubscriptionId);
iterator->Release();
return;
}

ChipLogProgress(InteractionModel, "Resuming subscriptionId %" PRIu32, subscriptionInfo.mSubscriptionId);
handler->ResumeSubscription(*imEngine->mpCASESessionMgr, subscriptionInfo);
subscriptionResumptionHelper.release();
#if CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
resumedSubscriptions = true;
#endif // CHIP_CONFIG_SUBSCRIPTION_TIMEOUT_RESUMPTION
Expand Down
2 changes: 2 additions & 0 deletions src/app/InteractionModelEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <app/AppConfig.h>
#include <app/MessageDef/AttributeReportIBs.h>
#include <app/MessageDef/ReportDataMessage.h>
#include <app/SubscriptionResumptionHelper.h>
#include <lib/core/CHIPCore.h>
#include <lib/support/CodeUtils.h>
#include <lib/support/DLLUtil.h>
Expand Down Expand Up @@ -377,6 +378,7 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler,
friend class reporting::Engine;
friend class TestCommandInteraction;
friend class TestInteractionModelEngine;
friend class SubscriptionResumptionHelper;
using Status = Protocols::InteractionModel::Status;

void OnDone(CommandHandler & apCommandObj) override;
Expand Down
92 changes: 32 additions & 60 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon
InteractionType aInteractionType, Observer * observer) :
mExchangeCtx(*this),
mManagementCallback(apCallback)
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
,
mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
#endif
{
VerifyOrDie(apExchangeContext != nullptr);

Expand All @@ -83,8 +79,7 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Messaging::ExchangeCon

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
mExchangeCtx(*this), mManagementCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this),
mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
mExchangeCtx(*this), mManagementCallback(apCallback)
{
mInteractionType = InteractionType::Subscribe;
mFlags.ClearAll();
Expand All @@ -93,41 +88,56 @@ ReadHandler::ReadHandler(ManagementCallback & apCallback, Observer * observer) :
mObserver = observer;
}

void ReadHandler::ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
void ReadHandler::OnSubscriptionResumed(const SessionHandle & sessionHandle, SubscriptionResumptionHelper & helper)
{
mSubscriptionId = subscriptionInfo.mSubscriptionId;
mMinIntervalFloorSeconds = subscriptionInfo.mMinInterval;
mMaxInterval = subscriptionInfo.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, subscriptionInfo.mFabricFiltered);
mSubscriptionId = helper.mSubscriptionId;
mMinIntervalFloorSeconds = helper.mMinInterval;
mMaxInterval = helper.mMaxInterval;
SetStateFlag(ReadHandlerFlags::FabricFiltered, helper.mFabricFiltered);

// Move dynamically allocated attributes and events from the SubscriptionInfo struct into
// the object pool managed by the IM engine
for (size_t i = 0; i < subscriptionInfo.mAttributePaths.AllocatedSize(); i++)
for (size_t i = 0; i < helper.mAttributePaths.AllocatedSize(); i++)
{
AttributePathParams attributePathParams = subscriptionInfo.mAttributePaths[i].GetParams();
CHIP_ERROR err =
InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, attributePathParams);
AttributePathParams params = helper.mAttributePaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontAttributePathList(mpAttributePathList, params);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}
for (size_t i = 0; i < subscriptionInfo.mEventPaths.AllocatedSize(); i++)
for (size_t i = 0; i < helper.mEventPaths.AllocatedSize(); i++)
{
EventPathParams eventPathParams = subscriptionInfo.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, eventPathParams);
EventPathParams params = helper.mEventPaths[i].GetParams();
CHIP_ERROR err = InteractionModelEngine::GetInstance()->PushFrontEventPathParamsList(mpEventPathList, params);
if (err != CHIP_NO_ERROR)
{
Close();
return;
}
}

// Ask IM engine to start CASE session with subscriber
ScopedNodeId peerNode = ScopedNodeId(subscriptionInfo.mNodeId, subscriptionInfo.mFabricIndex);
caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
mSessionHandle.Grab(sessionHandle);

SetStateFlag(ReadHandlerFlags::ActiveSubscription);

auto * appCallback = mManagementCallback.GetAppCallback();
if (appCallback)
{
appCallback->OnSubscriptionEstablished(*this);
}
// Notify the observer that a subscription has been resumed
mObserver->OnSubscriptionEstablished(this);

MoveToState(HandlerState::CanStartReporting);

ObjectList<AttributePathParams> * attributePath = mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

#endif // CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
Expand Down Expand Up @@ -892,43 +902,5 @@ void ReadHandler::ClearStateFlag(ReadHandlerFlags aFlag)
SetStateFlag(aFlag, false);
}

void ReadHandler::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);

_this->mSessionHandle.Grab(sessionHandle);

_this->SetStateFlag(ReadHandlerFlags::ActiveSubscription);

auto * appCallback = _this->mManagementCallback.GetAppCallback();
if (appCallback)
{
appCallback->OnSubscriptionEstablished(*_this);
}
// Notify the observer that a subscription has been resumed
_this->mObserver->OnSubscriptionEstablished(_this);

_this->MoveToState(HandlerState::CanStartReporting);

ObjectList<AttributePathParams> * attributePath = _this->mpAttributePathList;
while (attributePath)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(attributePath->mValue);
attributePath = attributePath->mpNext;
}
}

void ReadHandler::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err)
{
ReadHandler * const _this = static_cast<ReadHandler *>(context);
VerifyOrDie(_this != nullptr);

// TODO: Have a retry mechanism tied to wake interval for IC devices
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
err.Format());
_this->Close();
}

} // namespace app
} // namespace chip
21 changes: 5 additions & 16 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <app/MessageDef/EventPathIBs.h>
#include <app/ObjectList.h>
#include <app/OperationalSessionSetup.h>
#include <app/SubscriptionResumptionHelper.h>
#include <app/SubscriptionResumptionStorage.h>
#include <lib/core/CHIPCallback.h>
#include <lib/core/CHIPCore.h>
Expand Down Expand Up @@ -305,13 +306,11 @@ class ReadHandler : public Messaging::ExchangeDelegate
#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
/**
*
* @brief Resume a persisted subscription
* @brief Initialize a ReadHandler for a resumed subsciption
*
* Used after ReadHandler(ManagementCallback & apCallback). This will start a CASE session
* with the subscriber if one doesn't already exist, and send full priming report when connected.
* Used after the SubscriptionResumptionHelper establishs the CASE session
*/
void ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo);
void OnSubscriptionResumed(const SessionHandle & sessionHandle, SubscriptionResumptionHelper & helper);
#endif

/**
Expand Down Expand Up @@ -429,6 +428,7 @@ class ReadHandler : public Messaging::ExchangeDelegate
//
friend class chip::app::reporting::Engine;
friend class chip::app::InteractionModelEngine;
friend class chip::app::SubscriptionResumptionHelper;
wqx6 marked this conversation as resolved.
Show resolved Hide resolved

// The report scheduler needs to be able to access StateFlag private functions ShouldStartReporting(), CanStartReporting(),
// ForceDirtyState() and IsDirty() to know when to schedule a run so it is declared as a friend class.
Expand Down Expand Up @@ -485,11 +485,6 @@ class ReadHandler : public Messaging::ExchangeDelegate
/// @param aFlag Flag to clear
void ClearStateFlag(ReadHandlerFlags aFlag);

// Helpers for continuing the subscription resumption
static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle);
static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error);

AttributePathExpandIterator mAttributePathExpandIterator = AttributePathExpandIterator(nullptr);

// The current generation of the reporting engine dirty set the last time we were notified that a path we're interested in was
Expand Down Expand Up @@ -571,12 +566,6 @@ class ReadHandler : public Messaging::ExchangeDelegate

// TODO (#27675): Merge all observers into one and that one will dispatch the callbacks to the right place.
Observer * mObserver = nullptr;

#if CHIP_CONFIG_PERSIST_SUBSCRIPTIONS
// Callbacks to handle server-initiated session success/failure
chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
chip::Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
#endif
};
} // namespace app
} // namespace chip
80 changes: 80 additions & 0 deletions src/app/SubscriptionResumptionHelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
*
* Copyright (c) 2023 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <app/InteractionModelEngine.h>
#include <app/SubscriptionResumptionHelper.h>

namespace chip {
namespace app {
SubscriptionResumptionHelper::SubscriptionResumptionHelper() :
mOnConnectedCallback(HandleDeviceConnected, this), mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this)
{}

CHIP_ERROR SubscriptionResumptionHelper::ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo)
wqx6 marked this conversation as resolved.
Show resolved Hide resolved
{
mNodeId = subscriptionInfo.mNodeId;
mFabricIndex = subscriptionInfo.mFabricIndex;
mSubscriptionId = subscriptionInfo.mSubscriptionId;
mMinInterval = subscriptionInfo.mMinInterval;
mMaxInterval = subscriptionInfo.mMaxInterval;
mFabricFiltered = subscriptionInfo.mFabricFiltered;
mAttributePaths = std::move(subscriptionInfo.mAttributePaths);
mEventPaths = std::move(subscriptionInfo.mEventPaths);
andy31415 marked this conversation as resolved.
Show resolved Hide resolved

ScopedNodeId peerNode = ScopedNodeId(mNodeId, mFabricIndex);
caseSessionManager.FindOrEstablishSession(peerNode, &mOnConnectedCallback, &mOnConnectionFailureCallback);
return CHIP_NO_ERROR;
}

void SubscriptionResumptionHelper::HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle)
{
Platform::UniquePtr<SubscriptionResumptionHelper> _this(static_cast<SubscriptionResumptionHelper *>(context));
wqx6 marked this conversation as resolved.
Show resolved Hide resolved
InteractionModelEngine * imEngine = InteractionModelEngine::GetInstance();
if (!imEngine->EnsureResourceForSubscription(_this->mFabricIndex, _this->mAttributePaths.AllocatedSize(),
_this->mEventPaths.AllocatedSize()))
{
ChipLogProgress(InteractionModel, "no resource for subscription resumption");
return;
}
ReadHandler * readHandler = imEngine->mReadHandlers.CreateObject(*imEngine, imEngine->GetReportScheduler());
if (readHandler == nullptr)
{
ChipLogProgress(InteractionModel, "no resource for ReadHandler creation");
return;
}
readHandler->OnSubscriptionResumed(sessionHandle, *_this);
}

void SubscriptionResumptionHelper::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error)
{
Platform::UniquePtr<SubscriptionResumptionHelper> _this(static_cast<SubscriptionResumptionHelper *>(context));
ChipLogError(DataManagement, "Failed to establish CASE for subscription-resumption with error '%" CHIP_ERROR_FORMAT "'",
error.Format());
// If the device fails to establish the session, the subscriber might be offline and its subscription read client will
// be deleted after the device reconnect to the subscriber. This subscription will be never used again. So clean up
// the persistent subscription information storage.
auto * subscriptionResumptionStorage = InteractionModelEngine::GetInstance()->GetSubscriptionResumptionStorage();
if (subscriptionResumptionStorage)
{
subscriptionResumptionStorage->Delete(_this->mNodeId, _this->mFabricIndex, _this->mSubscriptionId);
}
}

} // namespace app
} // namespace chip
53 changes: 53 additions & 0 deletions src/app/SubscriptionResumptionHelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
*
* Copyright (c) 2023 Project CHIP Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <app/AttributePathParams.h>
#include <app/CASESessionManager.h>
#include <app/SubscriptionResumptionStorage.h>

namespace chip {
namespace app {
class InteractionModelEngine;
class ReadHandler;

class SubscriptionResumptionHelper : private SubscriptionResumptionStorage::SubscriptionInfo
wqx6 marked this conversation as resolved.
Show resolved Hide resolved
{
public:
SubscriptionResumptionHelper();

~SubscriptionResumptionHelper() {}

private:
wqx6 marked this conversation as resolved.
Show resolved Hide resolved
friend class InteractionModelEngine;
friend class ReadHandler;
wqx6 marked this conversation as resolved.
Show resolved Hide resolved

CHIP_ERROR ResumeSubscription(CASESessionManager & caseSessionManager,
SubscriptionResumptionStorage::SubscriptionInfo & subscriptionInfo);

// Callback funstions for continuing the subscription resumption
static void HandleDeviceConnected(void * context, Messaging::ExchangeManager & exchangeMgr,
const SessionHandle & sessionHandle);
static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error);

// Callbacks to handle server-initiated session success/failure
chip::Callback::Callback<OnDeviceConnected> mOnConnectedCallback;
chip::Callback::Callback<OnDeviceConnectionFailure> mOnConnectionFailureCallback;
};
} // namespace app
} // namespace chip
Loading