From c803dab4ec3ad97035e02332a86e057b58f5b7c5 Mon Sep 17 00:00:00 2001 From: Justin Wood Date: Wed, 27 Jul 2022 19:25:58 -0700 Subject: [PATCH] Establish CASE on re-subscription (#20080) (#21310) * Establish CASE on re-subscription This adds support for re-establishing CASE on re-subscription as a default policy implementation, with the application having the ability to over-ride that if needed. * Review feedback WIP * Further fixes * Added a Python test to validate re-subscription * Updated comment style * More minor cleanup * More build fixes * Update to master * Review feedback * Review Co-authored-by: Jerry Johns --- examples/bridge-app/esp32/main/main.cpp | 1 + src/app/BufferedReadCallback.h | 4 +- src/app/ClusterStateCache.h | 4 +- src/app/DeviceProxy.h | 1 - src/app/InteractionModelEngine.cpp | 22 +- src/app/InteractionModelEngine.h | 17 +- src/app/OperationalDeviceProxy.cpp | 9 +- src/app/ReadClient.cpp | 310 ++++++++++++------ src/app/ReadClient.h | 137 ++++++-- src/app/ReadHandler.cpp | 3 + src/app/ReadPrepareParams.h | 22 +- .../operational-credentials-server.cpp | 1 + src/app/server/Server.cpp | 6 +- src/app/tests/BUILD.gn | 1 + .../tests/integration/chip_im_initiator.cpp | 1 - .../interaction_model/InteractionModel.cpp | 2 - .../interaction_model/InteractionModel.h | 2 +- .../CHIPDeviceControllerFactory.cpp | 5 +- src/controller/TypedReadCallback.h | 16 +- src/controller/java/AndroidCallbacks.cpp | 19 +- src/controller/java/AndroidCallbacks.h | 2 +- .../python/chip/clusters/Attribute.py | 44 ++- .../python/chip/clusters/attribute.cpp | 16 +- .../python/test/test_scripts/base.py | 46 +++ .../test/test_scripts/mobile-device-test.py | 4 + src/controller/tests/TestEventChunking.cpp | 2 - src/controller/tests/TestReadChunking.cpp | 3 - src/controller/tests/data_model/TestRead.cpp | 138 +++++--- 28 files changed, 593 insertions(+), 245 deletions(-) diff --git a/examples/bridge-app/esp32/main/main.cpp b/examples/bridge-app/esp32/main/main.cpp index 17f0e15568e768..4f86e1872bcefa 100644 --- a/examples/bridge-app/esp32/main/main.cpp +++ b/examples/bridge-app/esp32/main/main.cpp @@ -35,6 +35,7 @@ #include #include +#include #include #if CONFIG_ENABLE_ESP32_FACTORY_DATA_PROVIDER diff --git a/src/app/BufferedReadCallback.h b/src/app/BufferedReadCallback.h index ea7dd28c4dbe5a..6b2d31d2b19a1e 100644 --- a/src/app/BufferedReadCallback.h +++ b/src/app/BufferedReadCallback.h @@ -86,9 +86,9 @@ class BufferedReadCallback : public ReadClient::Callback mCallback.OnSubscriptionEstablished(aSubscriptionId); } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override + CHIP_ERROR OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { - mCallback.OnResubscriptionAttempt(aTerminationCause, aNextResubscribeIntervalMsec); + return mCallback.OnResubscriptionNeeded(apReadClient, aTerminationCause); } void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override diff --git a/src/app/ClusterStateCache.h b/src/app/ClusterStateCache.h index b16fd58b1ee012..8ba58f1d923798 100644 --- a/src/app/ClusterStateCache.h +++ b/src/app/ClusterStateCache.h @@ -576,9 +576,9 @@ class ClusterStateCache : protected ReadClient::Callback mCallback.OnSubscriptionEstablished(aSubscriptionId); } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override + CHIP_ERROR OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { - mCallback.OnResubscriptionAttempt(aTerminationCause, aNextResubscribeIntervalMsec); + return mCallback.OnResubscriptionNeeded(apReadClient, aTerminationCause); } void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override diff --git a/src/app/DeviceProxy.h b/src/app/DeviceProxy.h index ce9a3909561ebc..3315576065b6ed 100644 --- a/src/app/DeviceProxy.h +++ b/src/app/DeviceProxy.h @@ -27,7 +27,6 @@ #pragma once #include -#include #include #include #include diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp index d6dbc2058836b6..e4f76d46d113b5 100644 --- a/src/app/InteractionModelEngine.cpp +++ b/src/app/InteractionModelEngine.cpp @@ -43,13 +43,17 @@ InteractionModelEngine * InteractionModelEngine::GetInstance() return &sInteractionModelEngine; } -CHIP_ERROR InteractionModelEngine::Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable) +CHIP_ERROR InteractionModelEngine::Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable, + CASESessionManager * apCASESessionMgr) { - mpExchangeMgr = apExchangeMgr; - mpFabricTable = apFabricTable; + VerifyOrReturnError(apFabricTable != nullptr, CHIP_ERROR_INVALID_ARGUMENT); + VerifyOrReturnError(apExchangeMgr != nullptr, CHIP_ERROR_INVALID_ARGUMENT); + + mpExchangeMgr = apExchangeMgr; + mpFabricTable = apFabricTable; + mpCASESessionMgr = apCASESessionMgr; ReturnErrorOnFailure(mpExchangeMgr->RegisterUnsolicitedMessageHandlerForProtocol(Protocols::InteractionModel::Id, this)); - VerifyOrReturnError(mpFabricTable != nullptr, CHIP_ERROR_INVALID_ARGUMENT); mReportingEngine.Init(); mMagic++; @@ -122,6 +126,16 @@ void InteractionModelEngine::Shutdown() mEventPathPool.ReleaseAll(); mDataVersionFilterPool.ReleaseAll(); mpExchangeMgr->UnregisterUnsolicitedMessageHandlerForProtocol(Protocols::InteractionModel::Id); + + mpCASESessionMgr = nullptr; + + // + // We _should_ be clearing these out, but doing so invites a world + // of trouble. #21233 tracks fixing the underlying assumptions to make + // this possible. + // + // mpFabricTable = nullptr; + // mpExchangeMgr = nullptr; } uint32_t InteractionModelEngine::GetNumActiveReadHandlers() const diff --git a/src/app/InteractionModelEngine.h b/src/app/InteractionModelEngine.h index e5e414bc746a9f..208b88d29bfa60 100644 --- a/src/app/InteractionModelEngine.h +++ b/src/app/InteractionModelEngine.h @@ -59,6 +59,8 @@ #include #include +#include + namespace chip { namespace app { @@ -102,17 +104,26 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, * Initialize the InteractionModel Engine. * * @param[in] apExchangeMgr A pointer to the ExchangeManager object. + * @param[in] apFabricTable A pointer to the FabricTable object. + * @param[in] apCASESessionMgr An optional pointer to a CASESessionManager (used for re-subscriptions). * * @retval #CHIP_ERROR_INCORRECT_STATE If the state is not equal to * kState_NotInitialized. * @retval #CHIP_NO_ERROR On success. * */ - CHIP_ERROR Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable); + CHIP_ERROR Init(Messaging::ExchangeManager * apExchangeMgr, FabricTable * apFabricTable, + CASESessionManager * apCASESessionMgr = nullptr); void Shutdown(); - Messaging::ExchangeManager * GetExchangeManager(void) const { return mpExchangeMgr; }; + Messaging::ExchangeManager * GetExchangeManager(void) const { return mpExchangeMgr; } + + /** + * Returns a pointer to the CASESessionManager. This can return nullptr if one wasn't + * provided in the call to Init(). + */ + CASESessionManager * GetCASESessionManager() const { return mpCASESessionMgr; } /** * Tears down an active subscription. @@ -551,6 +562,8 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, FabricTable * mpFabricTable; + CASESessionManager * mpCASESessionMgr = nullptr; + // A magic number for tracking values between stack Shutdown()-s and Init()-s. // An ObjectHandle is valid iff. its magic equals to this one. uint32_t mMagic = 0; diff --git a/src/app/OperationalDeviceProxy.cpp b/src/app/OperationalDeviceProxy.cpp index 5852f727a65ae0..45386ca6626995 100644 --- a/src/app/OperationalDeviceProxy.cpp +++ b/src/app/OperationalDeviceProxy.cpp @@ -24,12 +24,11 @@ * messages to and from the corresponding CHIP devices. */ -#include "OperationalDeviceProxy.h" +#include -#include "CASEClient.h" -#include "CommandSender.h" -#include "ReadPrepareParams.h" -#include "transport/SecureSession.h" +#include +#include +#include #include #include diff --git a/src/app/ReadClient.cpp b/src/app/ReadClient.cpp index ce4056161c2382..8d581db39bf319 100644 --- a/src/app/ReadClient.cpp +++ b/src/app/ReadClient.cpp @@ -34,48 +34,11 @@ namespace chip { namespace app { -/** - * @brief The default resubscribe policy will pick a random timeslot - * with millisecond resolution over an ever increasing window, - * following a fibonacci sequence up to CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX, - * Average of the randomized wait time past the CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX - * will be around one hour. - * When the retry count resets to 0, the sequence starts from the beginning again. - */ -static void DefaultResubscribePolicy(uint32_t aNumCumulativeRetries, uint32_t & aNextSubscriptionIntervalMsec, - bool & aShouldResubscribe) -{ - uint32_t maxWaitTimeInMsec = 0; - uint32_t waitTimeInMsec = 0; - uint32_t minWaitTimeInMsec = 0; - - if (aNumCumulativeRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX) - { - maxWaitTimeInMsec = GetFibonacciForIndex(aNumCumulativeRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS; - } - else - { - maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS; - } - - if (maxWaitTimeInMsec != 0) - { - minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100; - waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec)); - } - - aNextSubscriptionIntervalMsec = waitTimeInMsec; - aShouldResubscribe = true; - ChipLogProgress(DataManagement, - "Computing Resubscribe policy: attempts %" PRIu32 ", max wait time %" PRIu32 " ms, selected wait time %" PRIu32 - " ms", - aNumCumulativeRetries, maxWaitTimeInMsec, waitTimeInMsec); -} - ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback, InteractionType aInteractionType) : mExchange(*this), - mpCallback(apCallback) + mpCallback(apCallback), mOnConnectedCallback(HandleDeviceConnected, this), + mOnConnectionFailureCallback(HandleDeviceConnectionFailure, this) { // Error if already initialized. mpExchangeMgr = apExchangeMgr; @@ -103,7 +66,7 @@ void ReadClient::ClearActiveSubscriptionState() void ReadClient::StopResubscription() { - ClearActiveSubscriptionState(); + CancelLivenessCheckTimer(); CancelResubscribeTimer(); mpCallback.OnDeallocatePaths(std::move(mReadPrepareParams)); @@ -114,6 +77,8 @@ ReadClient::~ReadClient() if (IsSubscriptionType()) { CancelLivenessCheckTimer(); + CancelResubscribeTimer(); + // // Only remove ourselves from the engine's tracker list if we still continue to have a valid pointer to it. // This won't be the case if the engine shut down before this destructor was called (in which case, mpImEngine @@ -126,7 +91,55 @@ ReadClient::~ReadClient() } } -void ReadClient::Close(CHIP_ERROR aError) +uint32_t ReadClient::ComputeTimeTillNextSubscription() +{ + uint32_t maxWaitTimeInMsec = 0; + uint32_t waitTimeInMsec = 0; + uint32_t minWaitTimeInMsec = 0; + + if (mNumRetries <= CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX) + { + maxWaitTimeInMsec = GetFibonacciForIndex(mNumRetries) * CHIP_RESUBSCRIBE_WAIT_TIME_MULTIPLIER_MS; + } + else + { + maxWaitTimeInMsec = CHIP_RESUBSCRIBE_MAX_RETRY_WAIT_INTERVAL_MS; + } + + if (maxWaitTimeInMsec != 0) + { + minWaitTimeInMsec = (CHIP_RESUBSCRIBE_MIN_WAIT_TIME_INTERVAL_PERCENT_PER_STEP * maxWaitTimeInMsec) / 100; + waitTimeInMsec = minWaitTimeInMsec + (Crypto::GetRandU32() % (maxWaitTimeInMsec - minWaitTimeInMsec)); + } + + return waitTimeInMsec; +} + +CHIP_ERROR ReadClient::ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional aNewSessionHandle, + bool aReestablishCASE) +{ + VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE); + + // + // If we're establishing CASE, make sure we are not provided a new SessionHandle as well. + // + VerifyOrReturnError(!aReestablishCASE || !aNewSessionHandle.HasValue(), CHIP_ERROR_INVALID_ARGUMENT); + + if (aNewSessionHandle.HasValue()) + { + mReadPrepareParams.mSessionHolder.Grab(aNewSessionHandle.Value()); + } + + mDoCaseOnNextResub = aReestablishCASE; + + ReturnErrorOnFailure( + InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( + System::Clock::Milliseconds32(aTimeTillNextResubscriptionMs), OnResubscribeTimerCallback, this)); + + return CHIP_NO_ERROR; +} + +void ReadClient::Close(CHIP_ERROR aError, bool allowResubscription) { if (IsReadType()) { @@ -139,20 +152,31 @@ void ReadClient::Close(CHIP_ERROR aError) { if (aError != CHIP_NO_ERROR) { - uint32_t nextResubscribeMsec = 0; - - if (ResubscribeIfNeeded(nextResubscribeMsec)) + ClearActiveSubscriptionState(); + + // + // We infer that re-subscription was requested by virtue of having a non-zero list of event OR attribute paths present + // in mReadPrepareParams. This would only be the case if an application called SendAutoResubscribeRequest which + // populates mReadPrepareParams with the values provided by the application. + // + if (allowResubscription && + (mReadPrepareParams.mEventPathParamsListSize != 0 || mReadPrepareParams.mAttributePathParamsListSize != 0)) { - ChipLogProgress(DataManagement, - "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32 - "ms due to error %" CHIP_ERROR_FORMAT, - mFabricIndex, ChipLogValueX64(mPeerNodeId), mNumRetries, nextResubscribeMsec, aError.Format()); - mpCallback.OnResubscriptionAttempt(aError, nextResubscribeMsec); - ClearActiveSubscriptionState(); - return; + aError = mpCallback.OnResubscriptionNeeded(this, aError); + if (aError == CHIP_NO_ERROR) + { + return; + } } + + // + // Either something bad happened when requesting resubscription or the application has decided to not + // continue by returning an error. Let's convey the error back up to the application + // and shut everything down. + // mpCallback.OnError(aError); } + StopResubscription(); } @@ -288,9 +312,7 @@ CHIP_ERROR ReadClient::SendReadRequest(ReadPrepareParams & aReadPrepareParams) ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::ReadRequest, std::move(msgBuf), Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse))); - mPeerNodeId = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeerNodeId(); - mFabricIndex = aReadPrepareParams.mSessionHolder->GetFabricIndex(); - + mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer(); MoveToState(ClientState::AwaitingInitialReport); return CHIP_NO_ERROR; @@ -535,8 +557,13 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload) { MoveToState(ClientState::AwaitingSubscribeResponse); } - else + else if (IsSubscriptionActive()) { + // + // Only refresh the liveness check timer if we've successfully established + // a subscription and have a valid value for mMaxInterval which the function + // relies on. + // RefreshLivenessCheckTimer(); } } @@ -620,7 +647,8 @@ CHIP_ERROR ReadClient::ProcessAttributeReportIBs(TLV::TLVReader & aAttributeRepo DataVersion version = 0; ReturnErrorOnFailure(data.GetDataVersion(&version)); attributePath.mDataVersion.SetValue(version); - if (mReadPrepareParams.mResubscribePolicy != nullptr) + + if (mReadPrepareParams.mpDataVersionFilterList != nullptr) { UpdateDataVersionFilters(attributePath); } @@ -671,10 +699,12 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea ReturnErrorOnFailure(data.GetData(&dataReader)); - if (mReadPrepareParams.mResubscribePolicy != nullptr) - { - mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1); - } + // + // Update the event number being tracked in mReadPrepareParams in case + // we want to send it in the next SubscribeRequest message to convey + // the event number for which we have already received an event. + // + mReadPrepareParams.mEventNumber.SetValue(header.mEventNumber + 1); NoteReportingData(); mpCallback.OnEventData(header, &dataReader, nullptr); @@ -703,22 +733,37 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea return err; } +void ReadClient::OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout) +{ + mLivenessTimeoutOverride = aLivenessTimeout; + RefreshLivenessCheckTimer(); +} + CHIP_ERROR ReadClient::RefreshLivenessCheckTimer() { CHIP_ERROR err = CHIP_NO_ERROR; + VerifyOrReturnError(mState == ClientState::SubscriptionActive, CHIP_ERROR_INCORRECT_STATE); + CancelLivenessCheckTimer(); - VerifyOrReturnError(mExchange, CHIP_ERROR_INCORRECT_STATE); - VerifyOrReturnError(mExchange->HasSessionHandle(), CHIP_ERROR_INCORRECT_STATE); + System::Clock::Timeout timeout; - System::Clock::Timeout timeout = System::Clock::Seconds16(mMaxInterval) + mExchange->GetSessionHandle()->GetAckTimeout(); + if (mLivenessTimeoutOverride != System::Clock::kZero) + { + timeout = mLivenessTimeoutOverride; + } + else + { + VerifyOrReturnError(mReadPrepareParams.mSessionHolder, CHIP_ERROR_INCORRECT_STATE); + timeout = System::Clock::Seconds16(mMaxInterval) + mReadPrepareParams.mSessionHolder->GetAckTimeout(); + } // EFR32/MBED/INFINION/K32W's chrono count return long unsinged, but other platform returns unsigned - ChipLogProgress(DataManagement, - "Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32 - " Peer = %02x:" ChipLogFormatX64, - static_cast(timeout.count()), mSubscriptionId, mFabricIndex, ChipLogValueX64(mPeerNodeId)); + ChipLogProgress( + DataManagement, + "Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32 " Peer = %02x:" ChipLogFormatX64, + static_cast(timeout.count()), mSubscriptionId, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId())); err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( timeout, OnLivenessTimeoutCallback, this); @@ -755,7 +800,7 @@ void ReadClient::OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * ChipLogError(DataManagement, "Subscription Liveness timeout with SubscriptionID = 0x%08" PRIx32 ", Peer = %02x:" ChipLogFormatX64, - _this->mSubscriptionId, _this->mFabricIndex, ChipLogValueX64(_this->mPeerNodeId)); + _this->mSubscriptionId, _this->GetFabricIndex(), ChipLogValueX64(_this->GetPeerNodeId())); // TODO: add a more specific error here for liveness timeout failure to distinguish between other classes of timeouts (i.e // response timeouts). @@ -782,7 +827,7 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP ChipLogProgress(DataManagement, "Subscription established with SubscriptionID = 0x%08" PRIx32 " MinInterval = %u" "s MaxInterval = %us Peer = %02x:" ChipLogFormatX64, - mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, mFabricIndex, ChipLogValueX64(mPeerNodeId)); + mSubscriptionId, mMinIntervalFloorSeconds, mMaxInterval, GetFabricIndex(), ChipLogValueX64(GetPeerNodeId())); ReturnErrorOnFailure(subscribeResponse.ExitContainer()); @@ -790,10 +835,7 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP mpCallback.OnSubscriptionEstablished(subscriptionId); - if (mReadPrepareParams.mResubscribePolicy != nullptr) - { - mNumRetries = 0; - } + mNumRetries = 0; RefreshLivenessCheckTimer(); @@ -803,12 +845,7 @@ CHIP_ERROR ReadClient::ProcessSubscribeResponse(System::PacketBufferHandle && aP CHIP_ERROR ReadClient::SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams) { mReadPrepareParams = std::move(aReadPrepareParams); - if (mReadPrepareParams.mResubscribePolicy == nullptr) - { - mReadPrepareParams.mResubscribePolicy = DefaultResubscribePolicy; - } - - CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams); + CHIP_ERROR err = SendSubscribeRequest(mReadPrepareParams); if (err != CHIP_NO_ERROR) { StopResubscription(); @@ -836,6 +873,9 @@ CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadP Span dataVersionFilters(aReadPrepareParams.mpDataVersionFilterList, aReadPrepareParams.mDataVersionFilterListSize); + VerifyOrReturnError(aReadPrepareParams.mAttributePathParamsListSize != 0 || aReadPrepareParams.mEventPathParamsListSize != 0, + CHIP_ERROR_INVALID_ARGUMENT); + System::PacketBufferHandle msgBuf; System::PacketBufferTLVWriter writer; SubscribeRequestMessage::Builder request; @@ -914,48 +954,108 @@ CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadP ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::SubscribeRequest, std::move(msgBuf), Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse))); - mPeerNodeId = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeerNodeId(); - mFabricIndex = aReadPrepareParams.mSessionHolder->GetFabricIndex(); - + mPeer = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeer(); MoveToState(ClientState::AwaitingInitialReport); return CHIP_NO_ERROR; } -void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void * apAppState) +CHIP_ERROR ReadClient::DefaultResubscribePolicy(CHIP_ERROR aTerminationCause) { - ReadClient * const _this = reinterpret_cast(apAppState); - assert(_this != nullptr); - _this->SendSubscribeRequest(_this->mReadPrepareParams); - _this->mNumRetries++; + VerifyOrReturnError(IsIdle(), CHIP_ERROR_INCORRECT_STATE); + + auto timeTillNextResubscription = ComputeTimeTillNextSubscription(); + ChipLogProgress(DataManagement, + "Will try to resubscribe to %02x:" ChipLogFormatX64 " at retry index %" PRIu32 " after %" PRIu32 + "ms due to error %" CHIP_ERROR_FORMAT, + GetFabricIndex(), ChipLogValueX64(GetPeerNodeId()), mNumRetries, timeTillNextResubscription, + aTerminationCause.Format()); + ReturnErrorOnFailure(ScheduleResubscription(timeTillNextResubscription, NullOptional, aTerminationCause == CHIP_ERROR_TIMEOUT)); + return CHIP_NO_ERROR; } -bool ReadClient::ResubscribeIfNeeded(uint32_t & aNextResubscribeIntervalMsec) +void ReadClient::HandleDeviceConnected(void * context, OperationalDeviceProxy * device) { - bool shouldResubscribe = true; - uint32_t intervalMsec = 0; - aNextResubscribeIntervalMsec = 0; - if (mReadPrepareParams.mResubscribePolicy == nullptr) + ReadClient * const _this = static_cast(context); + VerifyOrDie(_this != nullptr); + + ChipLogProgress(DataManagement, "HandleDeviceConnected %d\n", device->GetSecureSession().HasValue()); + _this->mReadPrepareParams.mSessionHolder.Grab(device->GetSecureSession().Value()); + + auto err = _this->SendSubscribeRequest(_this->mReadPrepareParams); + if (err != CHIP_NO_ERROR) { - ChipLogDetail(DataManagement, "mResubscribePolicy is null"); - return false; + _this->Close(err); } - mReadPrepareParams.mResubscribePolicy(mNumRetries, intervalMsec, shouldResubscribe); - if (!shouldResubscribe) +} + +void ReadClient::HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR err) +{ + ReadClient * const _this = static_cast(context); + VerifyOrDie(_this != nullptr); + + ChipLogError(DataManagement, "Failed to establish CASE for re-subscription with error '%" CHIP_ERROR_FORMAT "'", err.Format()); + + _this->Close(err); +} + +void ReadClient::OnResubscribeTimerCallback(System::Layer * apSystemLayer, void * apAppState) +{ + ReadClient * const _this = static_cast(apAppState); + VerifyOrDie(_this != nullptr); + + CHIP_ERROR err; + + ChipLogProgress(DataManagement, "OnResubscribeTimerCallback: DoCASE = %d", _this->mDoCaseOnNextResub); + _this->mNumRetries++; + + if (_this->mDoCaseOnNextResub) { - ChipLogProgress(DataManagement, "Resubscribe has been stopped"); - return false; + auto * caseSessionManager = InteractionModelEngine::GetInstance()->GetCASESessionManager(); + VerifyOrExit(caseSessionManager != nullptr, err = CHIP_ERROR_INCORRECT_STATE); + + // + // We need to mark our session as defunct explicitly since the assessment of a liveness failure + // is usually triggered by the absence of any exchange activity that would have otherwise + // automatically marked the session as defunct on a response timeout. + // + // Doing so will ensure that the subsequent call to FindOrEstablishSession will not bind to + // an existing established session but rather trigger establishing a new one. + // + if (_this->mReadPrepareParams.mSessionHolder) + { + _this->mReadPrepareParams.mSessionHolder->AsSecureSession()->MarkAsDefunct(); + } + + // + // TODO: Until #19259 is merged, we cannot actually just get by with the above logic since marking sessions + // defunct has no effect on resident OperationalDeviceProxy instances that are already bound + // to a now-defunct CASE session. + // + auto proxy = caseSessionManager->FindExistingSession(_this->mPeer); + if (proxy != nullptr) + { + proxy->Disconnect(); + } + + caseSessionManager->FindOrEstablishSession(_this->mPeer, &_this->mOnConnectedCallback, + &_this->mOnConnectionFailureCallback); + return; } - CHIP_ERROR err = InteractionModelEngine::GetInstance()->GetExchangeManager()->GetSessionManager()->SystemLayer()->StartTimer( - System::Clock::Milliseconds32(intervalMsec), OnResubscribeTimerCallback, this); + + err = _this->SendSubscribeRequest(_this->mReadPrepareParams); + +exit: if (err != CHIP_NO_ERROR) { - ChipLogError(DataManagement, "Fail to resubscribe with error %" CHIP_ERROR_FORMAT, err.Format()); - return false; + // + // Call Close (which should trigger re-subscription again) EXCEPT if we got here because we didn't have a valid + // CASESessionManager pointer when mDoCaseOnNextResub was true. + // + // In that case, don't permit re-subscription to occur. + // + _this->Close(err, err != CHIP_ERROR_INCORRECT_STATE); } - - aNextResubscribeIntervalMsec = intervalMsec; - return true; } void ReadClient::UpdateDataVersionFilters(const ConcreteDataAttributePath & aPath) diff --git a/src/app/ReadClient.h b/src/app/ReadClient.h index 0c55d1953371e0..78291fa5fddc7a 100644 --- a/src/app/ReadClient.h +++ b/src/app/ReadClient.h @@ -23,6 +23,7 @@ */ #pragma once +#include "system/SystemClock.h" #include #include #include @@ -32,8 +33,10 @@ #include #include #include +#include #include #include +#include #include #include #include @@ -54,8 +57,12 @@ class InteractionModelEngine; /** * @class ReadClient * - * @brief The read client represents the initiator side of a Read Interaction, and is responsible - * for generating one Read Request for a particular set of attributes and/or events, and handling the Report Data response. + * @brief The read client represents the initiator side of a Read Or Subscribe Interaction (depending on the APIs invoked). + * + * When used to manage subscriptions, the client provides functionality to automatically re-subscribe as needed, + * including re-establishing CASE under certain conditions (see Callback::OnResubscriptionNeeded for more info). + * This is the default behavior. A consumer can completely opt-out of this behavior by over-riding + * Callback::OnResubscriptionNeeded and providing an alternative implementation. * */ class ReadClient : public Messaging::ExchangeDelegate @@ -131,14 +138,30 @@ class ReadClient : public Messaging::ExchangeDelegate virtual void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) {} /** - * OnResubscriptionAttempt will be called when a re-subscription has been scheduled as a result of the termination of an - * in-progress or previously active subscription. This object MUST continue to exist after this call is completed. The + * OnResubscriptionNeeded will be called when a subscription that was started with SendAutoResubscribeRequest has terminated + * and re-subscription is needed. The termination cause is provided to help inform subsequent re-subscription logic. + * + * The base implementation automatically re-subscribes at appropriate intervals taking the termination cause into account + * (see ReadClient::DefaultResubscribePolicy for more details). If the default implementation doesn't suffice, the logic of + * ReadClient::DefaultResubscribePolicy is broken down into its constituent methods that are publicly available for + * applications to call and sequence. + * + * If the method is over-ridden, it's the application's responsibility to take the appropriate steps needed to eventually + * call-back into the ReadClient object to schedule a re-subscription (by invoking ReadClient::ScheduleResubscription). + * + * If the application DOES NOT want re-subscription to happen on a particular invocation of this method, returning anything + * other than CHIP_NO_ERROR will terminate the interaction and result in OnError, OnDeallocatePaths and OnDone being called + * in that sequence. + * + * This object MUST continue to exist after this call is completed. The * application shall wait until it receives an OnDone call to destroy the object. * * @param[in] aTerminationCause The cause of failure of the subscription that just terminated. - * @param[in] aNextResubscribeIntervalMsec How long we will wait before trying to auto-resubscribe. */ - virtual void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) {} + virtual CHIP_ERROR OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) + { + return apReadClient->DefaultResubscribePolicy(aTerminationCause); + } /** * OnError will be called when an error occurs *after* a successful call to SendRequest(). The following @@ -278,8 +301,8 @@ class ReadClient : public Messaging::ExchangeDelegate return mInteractionType == InteractionType::Subscribe ? returnType(mSubscriptionId) : returnType::Missing(); } - FabricIndex GetFabricIndex() const { return mFabricIndex; } - NodeId GetPeerNodeId() const { return mPeerNodeId; } + FabricIndex GetFabricIndex() const { return mPeer.GetFabricIndex(); } + NodeId GetPeerNodeId() const { return mPeer.GetNodeId(); } bool IsReadType() { return mInteractionType == InteractionType::Read; } bool IsSubscriptionType() const { return mInteractionType == InteractionType::Subscribe; }; @@ -301,21 +324,59 @@ class ReadClient : public Messaging::ExchangeDelegate ReadClient * GetNextClient() { return mpNext; } void SetNextClient(ReadClient * apClient) { mpNext = apClient; } - // Like SendSubscribeRequest, but the ReadClient will automatically attempt to re-establish the subscription if - // we decide that the subscription has dropped. The exact behavior of the re-establishment can be controlled - // by setting mResubscribePolicy in the ReadPrepareParams. If not set, a default behavior with exponential backoff will be - // used. - // - // The application has to know to - // a) allocate a ReadPrepareParams object that will have fields mpEventPathParamsList and mpAttributePathParamsList and - // mpDataVersionFilterList with lifetimes as long as the ReadClient itself and b) free those up later in the call to - // OnDeallocatePaths. Note: At a given time in the system, you can either have a single subscription with re-sub enabled that - // that has mKeepSubscriptions = false, OR, multiple subs with re-sub enabled with mKeepSubscriptions = true. You shall not have - // a mix of both simultaneously. If SendAutoResubscribeRequest is called at all, it guarantees that it will call - // OnDeallocatePaths when OnDone is called. SendAutoResubscribeRequest is the only case that calls OnDeallocatePaths, since - // that's the only case when the consumer moved a ReadParams into the client. + /** + * Like SendSubscribeRequest, but the ReadClient will automatically attempt to re-establish the subscription if + * we decide that the subscription has dropped. The exact behavior of the re-establishment can be controlled + * by setting mResubscribePolicy in the ReadPrepareParams. If not set, a default behavior with exponential backoff will be + * used. + * + * The application has to know to + * a) allocate a ReadPrepareParams object that will have fields mpEventPathParamsList and mpAttributePathParamsList and + * mpDataVersionFilterList with lifetimes as long as the ReadClient itself and b) free those up later in the call to + * OnDeallocatePaths. Note: At a given time in the system, you can either have a single subscription with re-sub enabled that + * has mKeepSubscriptions = false, OR, multiple subs with re-sub enabled with mKeepSubscriptions = true. You shall not + * have a mix of both simultaneously. If SendAutoResubscribeRequest is called at all, it guarantees that it will call + * OnDeallocatePaths when OnDone is called. SendAutoResubscribeRequest is the only case that calls OnDeallocatePaths, since + * that's the only case when the consumer moved a ReadParams into the client. + * + */ CHIP_ERROR SendAutoResubscribeRequest(ReadPrepareParams && aReadPrepareParams); + /** + * This provides a standard re-subscription policy implementation that given a termination cause, does the following: + * - Calculates the time till next subscription with fibonacci back-off (implemented by ComputeTimeTillNextSubscription()). + * - Schedules the next subscription attempt at the computed interval from the previous step. Operational discovery and + * CASE establishment will be attempted if aTerminationCause was CHIP_ERROR_TIMEOUT. In all other cases, it will attempt + * to re-use a previously established session. + */ + CHIP_ERROR DefaultResubscribePolicy(CHIP_ERROR aTerminationCause); + + /** + * Computes the time, in milliseconds, until the next re-subscription over + * an ever increasing window following a fibonacci sequence with the current retry count + * used as input to the fibonacci algorithm. + * + * CHIP_RESUBSCRIBE_MAX_FIBONACCI_STEP_INDEX is the maximum value the retry count can tick up to. + * + */ + uint32_t ComputeTimeTillNextSubscription(); + + /** + * Schedules a re-subscription aTimeTillNextResubscriptionMs into the future. + * + * If an application wants to set up CASE on their own, they should call ComputeTimeTillNextSubscription() to compute the next + * interval at which they should attempt CASE and attempt CASE at that time. On successful CASE establishment, this method + * should be called with the new SessionHandle provided through 'aNewSessionHandle', 'aTimeTillNextResubscriptionMs' set to 0 + * (i.e async, but as soon as possible) and 'aReestablishCASE' set to false. + * + * Otherwise, if aReestablishCASE is true, operational discovery and CASE will be attempted at that time before + * the actual IM interaction is initiated. + * + * aReestablishCASE SHALL NOT be set to true if a valid SessionHandle is provided through newSessionHandle. + */ + CHIP_ERROR ScheduleResubscription(uint32_t aTimeTillNextResubscriptionMs, Optional aNewSessionHandle, + bool aReestablishCASE); + // Like SendSubscribeRequest, but allows sending certain forms of invalid // subscribe requests that servers are expected to reject, for testing // purposes. Should only be called from tests. @@ -326,6 +387,18 @@ class ReadClient : public Messaging::ExchangeDelegate } #endif // CONFIG_BUILD_FOR_HOST_UNIT_TEST + /** + * Override the interval at which liveness of the subscription is assessed. + * By default, this is set set to the max interval of the subscription + ACK timeout of the underlying session. + * + * This can be only be called once a subscription has been established and is active. Once called, this will cancel any existing + * liveness timers and schedule a new one. + * + * This can be called from the Callback::OnSubscriptionEstablished callback. + * + */ + void OverrideLivenessTimeout(System::Clock::Timeout aLivenessTimeout); + private: friend class TestReadInteraction; friend class InteractionModelEngine; @@ -405,13 +478,19 @@ class ReadClient : public Messaging::ExchangeDelegate * exchange and finally, signal to the application that it's * safe to release this object. * - * If aError != CHIP_NO_ERROR, it is delivered to the application through the OnError callback first. + * If aError != CHIP_NO_ERROR, this will trigger re-subscriptions if allowResubscription is true + * AND if this ReadClient instance is tracking a subscription AND the applications decides to do so + * in their implementation of Callback::OnResubscriptionNeeded(). * */ - void Close(CHIP_ERROR aError); + void Close(CHIP_ERROR aError, bool allowResubscription = true); void StopResubscription(); void ClearActiveSubscriptionState(); + + static void HandleDeviceConnected(void * context, OperationalDeviceProxy * device); + static void HandleDeviceConnectionFailure(void * context, const ScopedNodeId & peerId, CHIP_ERROR error); + CHIP_ERROR GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional & aEventMin); Messaging::ExchangeManager * mpExchangeMgr = nullptr; @@ -425,16 +504,22 @@ class ReadClient : public Messaging::ExchangeDelegate uint16_t mMinIntervalFloorSeconds = 0; uint16_t mMaxInterval = 0; SubscriptionId mSubscriptionId = 0; - NodeId mPeerNodeId = kUndefinedNodeId; - FabricIndex mFabricIndex = kUndefinedFabricIndex; - InteractionType mInteractionType = InteractionType::Read; + ScopedNodeId mPeer; + InteractionType mInteractionType = InteractionType::Read; Timestamp mEventTimestamp; + bool mDoCaseOnNextResub = true; + + chip::Callback::Callback mOnConnectedCallback; + chip::Callback::Callback mOnConnectionFailureCallback; + ReadClient * mpNext = nullptr; InteractionModelEngine * mpImEngine = nullptr; ReadPrepareParams mReadPrepareParams; uint32_t mNumRetries = 0; + System::Clock::Timeout mLivenessTimeoutOverride = System::Clock::kZero; + // End Of Container (0x18) uses one byte. static constexpr uint16_t kReservedSizeForEndOfContainer = 1; // Reserved size for the uint8_t InteractionModelRevision flag, which takes up 1 byte for the control tag and 1 byte for the diff --git a/src/app/ReadHandler.cpp b/src/app/ReadHandler.cpp index 6c2230b2012dd0..e40d8bf007213f 100644 --- a/src/app/ReadHandler.cpp +++ b/src/app/ReadHandler.cpp @@ -181,6 +181,7 @@ CHIP_ERROR ReadHandler::SendStatusReport(Protocols::InteractionModel::Status aSt VerifyOrReturnLogError(!mExchangeCtx, CHIP_ERROR_INCORRECT_STATE); VerifyOrReturnLogError(mSessionHandle, CHIP_ERROR_INCORRECT_STATE); auto exchange = InteractionModelEngine::GetInstance()->GetExchangeManager()->NewContext(mSessionHandle.Get().Value(), this); + VerifyOrReturnLogError(exchange != nullptr, CHIP_ERROR_INCORRECT_STATE); mExchangeCtx.Grab(exchange); } @@ -200,10 +201,12 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b VerifyOrReturnLogError(!mExchangeCtx, CHIP_ERROR_INCORRECT_STATE); VerifyOrReturnLogError(mSessionHandle, CHIP_ERROR_INCORRECT_STATE); auto exchange = InteractionModelEngine::GetInstance()->GetExchangeManager()->NewContext(mSessionHandle.Get().Value(), this); + VerifyOrReturnLogError(exchange != nullptr, CHIP_ERROR_INCORRECT_STATE); mExchangeCtx.Grab(exchange); } VerifyOrReturnLogError(mExchangeCtx, CHIP_ERROR_INCORRECT_STATE); + if (!IsReporting()) { mCurrentReportsBeginGeneration = InteractionModelEngine::GetInstance()->GetReportingEngine().GetDirtySetGeneration(); diff --git a/src/app/ReadPrepareParams.h b/src/app/ReadPrepareParams.h index 6d95b8b0b48a17..c37a2ef6e36eab 100644 --- a/src/app/ReadPrepareParams.h +++ b/src/app/ReadPrepareParams.h @@ -29,15 +29,6 @@ namespace chip { namespace app { -/** - * @brief Used to specify the re-subscription policy. Namely, the method is invoked and provided the number of - * retries that have occurred so far. - * - * aShouldResubscribe and aNextSubscriptionIntervalMsec are outparams indicating whether and how long into - * the future a re-subscription should happen. - */ -typedef void (*OnResubscribePolicyCB)(uint32_t aNumCumulativeRetries, uint32_t & aNextSubscriptionIntervalMsec, - bool & aShouldResubscribe); struct ReadPrepareParams { @@ -51,12 +42,11 @@ struct ReadPrepareParams Optional mEventNumber; // The timeout for waiting for the response or System::Clock::kZero to let the interaction model decide the timeout based on the // MRP timeouts of the session. - System::Clock::Timeout mTimeout = System::Clock::kZero; - uint16_t mMinIntervalFloorSeconds = 0; - uint16_t mMaxIntervalCeilingSeconds = 0; - bool mKeepSubscriptions = false; - bool mIsFabricFiltered = true; - OnResubscribePolicyCB mResubscribePolicy = nullptr; + System::Clock::Timeout mTimeout = System::Clock::kZero; + uint16_t mMinIntervalFloorSeconds = 0; + uint16_t mMaxIntervalCeilingSeconds = 0; + bool mKeepSubscriptions = false; + bool mIsFabricFiltered = true; ReadPrepareParams() {} ReadPrepareParams(const SessionHandle & sessionHandle) { mSessionHolder.Grab(sessionHandle); } @@ -78,7 +68,6 @@ struct ReadPrepareParams other.mEventPathParamsListSize = 0; other.mpAttributePathParamsList = nullptr; other.mAttributePathParamsListSize = 0; - mResubscribePolicy = other.mResubscribePolicy; } ReadPrepareParams & operator=(ReadPrepareParams && other) @@ -103,7 +92,6 @@ struct ReadPrepareParams other.mEventPathParamsListSize = 0; other.mpAttributePathParamsList = nullptr; other.mAttributePathParamsListSize = 0; - mResubscribePolicy = other.mResubscribePolicy; return *this; } }; diff --git a/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp b/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp index 13b4e85ce0af16..e1aacad7c580bd 100644 --- a/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp +++ b/src/app/clusters/operational-credentials-server/operational-credentials-server.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include diff --git a/src/app/server/Server.cpp b/src/app/server/Server.cpp index 6e8799911c554b..15674b10b92b38 100644 --- a/src/app/server/Server.cpp +++ b/src/app/server/Server.cpp @@ -221,9 +221,6 @@ CHIP_ERROR Server::Init(const ServerInitParams & initParams) app::DnssdServer::Instance().SetFabricTable(&mFabrics); app::DnssdServer::Instance().SetCommissioningModeProvider(&mCommissioningWindowManager); - err = chip::app::InteractionModelEngine::GetInstance()->Init(&mExchangeMgr, &GetFabricTable()); - SuccessOrExit(err); - chip::Dnssd::Resolver::Instance().Init(DeviceLayer::UDPEndPointManager()); #if CHIP_CONFIG_ENABLE_SERVER_IM_EVENT @@ -307,6 +304,9 @@ CHIP_ERROR Server::Init(const ServerInitParams & initParams) mCertificateValidityPolicy, mGroupsProvider); SuccessOrExit(err); + err = chip::app::InteractionModelEngine::GetInstance()->Init(&mExchangeMgr, &GetFabricTable(), &mCASESessionManager); + SuccessOrExit(err); + // This code is necessary to restart listening to existing groups after a reboot // Each manufacturer needs to validate that they can rejoin groups by placing this code at the appropriate location for them // diff --git a/src/app/tests/BUILD.gn b/src/app/tests/BUILD.gn index fd8d1571623cf6..6646c32fb4b6e4 100644 --- a/src/app/tests/BUILD.gn +++ b/src/app/tests/BUILD.gn @@ -32,6 +32,7 @@ static_library("helpers") { deps = [ "${chip_root}/src/access", + "${chip_root}/src/app", "${chip_root}/src/lib/support", "${chip_root}/src/messaging/tests:helpers", "${chip_root}/src/transport/raw/tests:helpers", diff --git a/src/app/tests/integration/chip_im_initiator.cpp b/src/app/tests/integration/chip_im_initiator.cpp index b83c598bce4b14..4636e14bc3fc0f 100644 --- a/src/app/tests/integration/chip_im_initiator.cpp +++ b/src/app/tests/integration/chip_im_initiator.cpp @@ -146,7 +146,6 @@ class MockInteractionModelApp : public ::chip::app::CommandSender::Callback, } } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override {} void OnAttributeData(const chip::app::ConcreteDataAttributePath & aPath, chip::TLV::TLVReader * aData, const chip::app::StatusIB & status) override {} diff --git a/src/app/tests/suites/commands/interaction_model/InteractionModel.cpp b/src/app/tests/suites/commands/interaction_model/InteractionModel.cpp index ab15e920fb8cbb..7cba1a4751d203 100644 --- a/src/app/tests/suites/commands/interaction_model/InteractionModel.cpp +++ b/src/app/tests/suites/commands/interaction_model/InteractionModel.cpp @@ -125,8 +125,6 @@ void InteractionModel::OnSubscriptionEstablished(SubscriptionId subscriptionId) ContinueOnChipMainThread(CHIP_NO_ERROR); } -void InteractionModel::OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) {} - /////////// WriteClient Callback Interface ///////// void InteractionModel::OnResponse(const WriteClient * client, const ConcreteDataAttributePath & path, StatusIB status) { diff --git a/src/app/tests/suites/commands/interaction_model/InteractionModel.h b/src/app/tests/suites/commands/interaction_model/InteractionModel.h index 6d444ef6e1c9db..e097fdc0efa1b6 100644 --- a/src/app/tests/suites/commands/interaction_model/InteractionModel.h +++ b/src/app/tests/suites/commands/interaction_model/InteractionModel.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -436,7 +437,6 @@ class InteractionModel : public InteractionModelReports, void OnError(CHIP_ERROR error) override; void OnDone(chip::app::ReadClient * aReadClient) override; void OnSubscriptionEstablished(chip::SubscriptionId subscriptionId) override; - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override; /////////// WriteClient Callback Interface ///////// void OnResponse(const chip::app::WriteClient * client, const chip::app::ConcreteDataAttributePath & path, chip::app::StatusIB status) override; diff --git a/src/controller/CHIPDeviceControllerFactory.cpp b/src/controller/CHIPDeviceControllerFactory.cpp index 84f67a96883003..701259d6989308 100644 --- a/src/controller/CHIPDeviceControllerFactory.cpp +++ b/src/controller/CHIPDeviceControllerFactory.cpp @@ -205,8 +205,6 @@ CHIP_ERROR DeviceControllerFactory::InitSystemState(FactoryInitParams params) InitDataModelHandler(stateParams.exchangeMgr); - ReturnErrorOnFailure(chip::app::InteractionModelEngine::GetInstance()->Init(stateParams.exchangeMgr, stateParams.fabricTable)); - ReturnErrorOnFailure(Dnssd::Resolver::Instance().Init(stateParams.udpEndPointManager)); if (params.enableServerInteractions) @@ -266,6 +264,9 @@ CHIP_ERROR DeviceControllerFactory::InitSystemState(FactoryInitParams params) stateParams.caseSessionManager = Platform::New(); ReturnErrorOnFailure(stateParams.caseSessionManager->Init(stateParams.systemLayer, sessionManagerConfig)); + ReturnErrorOnFailure(chip::app::InteractionModelEngine::GetInstance()->Init(stateParams.exchangeMgr, stateParams.fabricTable, + stateParams.caseSessionManager)); + // store the system state mSystemState = chip::Platform::New(std::move(stateParams)); mSystemState->SetTempFabricTable(tempFabricTable); diff --git a/src/controller/TypedReadCallback.h b/src/controller/TypedReadCallback.h index 91bee1629bfa81..7fa58df7c932b4 100644 --- a/src/controller/TypedReadCallback.h +++ b/src/controller/TypedReadCallback.h @@ -124,12 +124,16 @@ class TypedReadAttributeCallback final : public app::ReadClient::Callback } } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override + CHIP_ERROR OnResubscriptionNeeded(chip::app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { + ReturnErrorOnFailure(app::ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause)); + if (mOnResubscriptionAttempt) { - mOnResubscriptionAttempt(*mReadClient.get(), aTerminationCause, aNextResubscribeIntervalMsec); + mOnResubscriptionAttempt(*mReadClient.get(), aTerminationCause, apReadClient->ComputeTimeTillNextSubscription()); } + + return CHIP_NO_ERROR; } void OnDeallocatePaths(chip::app::ReadPrepareParams && aReadPrepareParams) override @@ -246,12 +250,16 @@ class TypedReadEventCallback final : public app::ReadClient::Callback } } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override + CHIP_ERROR OnResubscriptionNeeded(chip::app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { + ReturnErrorOnFailure(app::ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause)); + if (mOnResubscriptionAttempt) { - mOnResubscriptionAttempt(*mReadClient.get(), aTerminationCause, aNextResubscribeIntervalMsec); + mOnResubscriptionAttempt(*mReadClient.get(), aTerminationCause, apReadClient->ComputeTimeTillNextSubscription()); } + + return CHIP_NO_ERROR; } OnSuccessCallbackType mOnSuccess; diff --git a/src/controller/java/AndroidCallbacks.cpp b/src/controller/java/AndroidCallbacks.cpp index 56cf454c4429cb..1966bd59ecca68 100644 --- a/src/controller/java/AndroidCallbacks.cpp +++ b/src/controller/java/AndroidCallbacks.cpp @@ -547,22 +547,23 @@ void ReportEventCallback::OnSubscriptionEstablished(SubscriptionId aSubscription JniReferences::GetInstance().CallSubscriptionEstablished(mSubscriptionEstablishedCallbackRef); } -void ReportEventCallback::OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) +CHIP_ERROR ReportEventCallback::OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) { - VerifyOrReturn(mResubscriptionAttemptCallbackRef != nullptr, - ChipLogError(Controller, "mResubscriptionAttemptCallbackRef is null")); + VerifyOrReturnLogError(mResubscriptionAttemptCallbackRef != nullptr, CHIP_ERROR_INVALID_ARGUMENT); - CHIP_ERROR err = CHIP_NO_ERROR; - JNIEnv * env = JniReferences::GetInstance().GetEnvForCurrentThread(); + JNIEnv * env = JniReferences::GetInstance().GetEnvForCurrentThread(); + + ReturnErrorOnFailure(app::ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause)); jmethodID onResubscriptionAttemptMethod; - err = JniReferences::GetInstance().FindMethod(env, mResubscriptionAttemptCallbackRef, "onResubscriptionAttempt", "(II)V", - &onResubscriptionAttemptMethod); - VerifyOrReturn(err == CHIP_NO_ERROR, ChipLogError(Controller, "Could not find onResubscriptionAttempt method")); + ReturnLogErrorOnFailure(JniReferences::GetInstance().FindMethod( + env, mResubscriptionAttemptCallbackRef, "onResubscriptionAttempt", "(II)V", &onResubscriptionAttemptMethod)); DeviceLayer::StackUnlock unlock; env->CallVoidMethod(mResubscriptionAttemptCallbackRef, onResubscriptionAttemptMethod, aTerminationCause.AsInteger(), - aNextResubscribeIntervalMsec); + apReadClient->ComputeTimeTillNextSubscription()); + + return CHIP_NO_ERROR; } void ReportEventCallback::ReportError(jobject attributePath, CHIP_ERROR err) diff --git a/src/controller/java/AndroidCallbacks.h b/src/controller/java/AndroidCallbacks.h index 7d1dbb4434b964..9ba97362f53340 100644 --- a/src/controller/java/AndroidCallbacks.h +++ b/src/controller/java/AndroidCallbacks.h @@ -98,7 +98,7 @@ struct ReportEventCallback : public app::ReadClient::Callback void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override; - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override; + CHIP_ERROR OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override; /** Report errors back to Java layer. attributePath may be nullptr for general errors. */ void ReportError(jobject eventPath, CHIP_ERROR err); diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index 87fd371cf963c3..db29b86974a284 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -474,6 +474,9 @@ def __init__(self, transaction: 'AsyncReadTransaction', subscriptionId, devCtrl) self._subscriptionId = subscriptionId self._devCtrl = devCtrl self._isDone = False + self._onResubscriptionSucceededCb = None + self._onResubscriptionSucceededCb_isAsync = False + self._onResubscriptionAttemptedCb_isAsync = False def GetAttributes(self): ''' Returns the attribute value cache tracking the latest state on the publisher. @@ -493,14 +496,35 @@ def GetAttribute(self, path: TypedAttributePath) -> Any: def GetEvents(self): return self._readTransaction.GetAllEventValues() - def SetResubscriptionAttemptedCallback(self, callback: Callable[[SubscriptionTransaction, int, int], None]): + def OverrideLivenessTimeoutMs(self, timeoutMs: int): + handle = chip.native.GetLibraryHandle() + builtins.chipStack.Call( + lambda: handle.pychip_ReadClient_OverrideLivenessTimeout(self._readTransaction._pReadClient, timeoutMs) + ) + + def SetResubscriptionAttemptedCallback(self, callback: Callable[[SubscriptionTransaction, int, int], None], isAsync=False): ''' Sets the callback function that gets invoked anytime a re-subscription is attempted. The callback is expected to have the following signature: def Callback(transaction: SubscriptionTransaction, errorEncountered: int, nextResubscribeIntervalMsec: int) + + If the callback is an awaitable co-routine, isAsync should be set to True. ''' if callback is not None: self._onResubscriptionAttemptedCb = callback + self._onResubscriptionAttemptedCb_isAsync = isAsync + + def SetResubscriptionSucceededCallback(self, callback: Callback[[SubscriptionTransaction], None], isAsync=False): + ''' + Sets the callback function that gets invoked when a re-subscription attempt succeeds. The callback + is expected to have the following signature: + def Callback(transaction: SubscriptionTransaction) + + If the callback is an awaitable co-routine, isAsync should be set to True. + ''' + if callback is not None: + self._onResubscriptionSucceededCb = callback + self._onResubscriptionSucceededCb_isAsync = isAsync def SetAttributeUpdateCallback(self, callback: Callable[[TypedAttributePath, SubscriptionTransaction], None]): ''' @@ -550,7 +574,7 @@ def __repr__(self): return f'' -def DefaultResubscriptionAttemptedCallback(transaction: SubscriptionTransaction, terminationError, nextResubscribeIntervalMsec): +async def DefaultResubscriptionAttemptedCallback(transaction: SubscriptionTransaction, terminationError, nextResubscribeIntervalMsec): print(f"Previous subscription failed with Error: {terminationError} - re-subscribing in {nextResubscribeIntervalMsec}ms...") @@ -691,14 +715,26 @@ def _handleSubscriptionEstablished(self, subscriptionId): self._subscription_handler = SubscriptionTransaction( self, subscriptionId, self._devCtrl) self._future.set_result(self._subscription_handler) + else: + logging.info("Re-subscription succeeded!") + if self._subscription_handler._onResubscriptionSucceededCb is not None: + if (self._subscription_handler._onResubscriptionSucceededCb_isAsync): + self._event_loop.create_task( + self._subscription_handler._onResubscriptionSucceededCb(self._subscription_handler)) + else: + self._subscription_handler._onResubscriptionSucceededCb(self._subscription_handler) def handleSubscriptionEstablished(self, subscriptionId): self._event_loop.call_soon_threadsafe( self._handleSubscriptionEstablished, subscriptionId) def handleResubscriptionAttempted(self, terminationCause: int, nextResubscribeIntervalMsec: int): - self._event_loop.call_soon_threadsafe( - self._subscription_handler._onResubscriptionAttemptedCb, self._subscription_handler, terminationCause, nextResubscribeIntervalMsec) + if (self._subscription_handler._onResubscriptionAttemptedCb_isAsync): + self._event_loop.create_task(self._subscription_handler._onResubscriptionAttemptedCb( + self._subscription_handler, terminationCause, nextResubscribeIntervalMsec)) + else: + self._event_loop.call_soon_threadsafe( + self._subscription_handler._onResubscriptionAttemptedCb, self._subscription_handler, terminationCause, nextResubscribeIntervalMsec) def _handleReportBegin(self): pass diff --git a/src/controller/python/chip/clusters/attribute.cpp b/src/controller/python/chip/clusters/attribute.cpp index 59de72d01ffef5..488c1bc1c47521 100644 --- a/src/controller/python/chip/clusters/attribute.cpp +++ b/src/controller/python/chip/clusters/attribute.cpp @@ -15,6 +15,7 @@ * limitations under the License. */ +#include "system/SystemClock.h" #include #include #include @@ -22,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -146,9 +148,12 @@ class ReadClientCallback : public ReadClient::Callback gOnSubscriptionEstablishedCallback(mAppContext, aSubscriptionId); } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override + CHIP_ERROR OnResubscriptionNeeded(ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override { - gOnResubscriptionAttemptedCallback(mAppContext, aTerminationCause.AsInteger(), aNextResubscribeIntervalMsec); + ReturnErrorOnFailure(ReadClient::Callback::OnResubscriptionNeeded(apReadClient, aTerminationCause)); + gOnResubscriptionAttemptedCallback(mAppContext, aTerminationCause.AsInteger(), + apReadClient->ComputeTimeTillNextSubscription()); + return CHIP_NO_ERROR; } void OnEventData(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus) override @@ -383,6 +388,12 @@ void pychip_ReadClient_Abort(ReadClient * apReadClient, ReadClientCallback * apC delete apCallback; } +void pychip_ReadClient_OverrideLivenessTimeout(ReadClient * pReadClient, uint32_t livenessTimeoutMs) +{ + VerifyOrDie(pReadClient != nullptr); + pReadClient->OverrideLivenessTimeout(System::Clock::Milliseconds32(livenessTimeoutMs)); +} + chip::ChipError::StorageType pychip_ReadClient_Read(void * appContext, ReadClient ** pReadClient, ReadClientCallback ** pCallback, DeviceProxy * device, uint8_t * readParamsBuf, size_t numAttributePaths, size_t numDataversionFilters, size_t numEventPaths, ...) @@ -464,7 +475,6 @@ chip::ChipError::StorageType pychip_ReadClient_Read(void * appContext, ReadClien params.mMinIntervalFloorSeconds = pyParams.minInterval; params.mMaxIntervalCeilingSeconds = pyParams.maxInterval; params.mKeepSubscriptions = pyParams.keepSubscriptions; - params.mResubscribePolicy = PythonResubscribePolicy; dataVersionFilters.release(); attributePaths.release(); diff --git a/src/controller/python/test/test_scripts/base.py b/src/controller/python/test/test_scripts/base.py index 6cd812371f32c2..c742abe3476045 100644 --- a/src/controller/python/test/test_scripts/base.py +++ b/src/controller/python/test/test_scripts/base.py @@ -756,6 +756,52 @@ def CompareUnfilteredData(accessingFabric, otherFabric, expectedData): if (expectedDataFabric1 != readListDataFabric1): raise AssertionError("Got back mismatched data") + async def TestResubscription(self, nodeid: int): + ''' This validates the re-subscription logic by triggering a liveness failure caused by the expiration + of the underlying CASE session and the resultant failure to receive reports from the server. This should + trigger CASE session establishment and subscription restablishment. Both the attempt and successful + restablishment of the subscription are validated. + ''' + cv = asyncio.Condition() + resubAttempted = False + resubSucceeded = True + + async def OnResubscriptionAttempted(transaction, errorEncountered: int, nextResubscribeIntervalMsec: int): + self.logger.info("Re-subscription Attempted") + nonlocal resubAttempted + resubAttempted = True + + async def OnResubscriptionSucceeded(transaction): + self.logger.info("Re-subscription Succeeded") + nonlocal cv + async with cv: + cv.notify() + + subscription = await self.devCtrl.ReadAttribute(nodeid, [(Clusters.Basic.Attributes.ClusterRevision)], reportInterval=(0, 5)) + + # + # Register async callbacks that will fire when a re-sub is attempted or succeeds. + # + subscription.SetResubscriptionAttemptedCallback(OnResubscriptionAttempted, True) + subscription.SetResubscriptionSucceededCallback(OnResubscriptionSucceeded, True) + + # + # Over-ride the default liveness timeout (which is set quite high to accomodate for + # transport delays) to something very small. This ensures that our liveness timer will + # fire quickly and cause a re-subscription to occur naturally. + # + subscription.OverrideLivenessTimeoutMs(100) + + async with cv: + if (not(resubAttempted) or not(resubSucceeded)): + res = await asyncio.wait_for(cv.wait(), 3) + if not res: + self.logger.error("Timed out waiting for resubscription to succeed") + return False + + subscription.Shutdown() + return True + def TestCloseSession(self, nodeid: int): self.logger.info(f"Closing sessions with device {nodeid}") try: diff --git a/src/controller/python/test/test_scripts/mobile-device-test.py b/src/controller/python/test/test_scripts/mobile-device-test.py index f02a93dc388953..b6b7a1d4e91595 100755 --- a/src/controller/python/test/test_scripts/mobile-device-test.py +++ b/src/controller/python/test/test_scripts/mobile-device-test.py @@ -140,6 +140,10 @@ def TestDatamodel(test: BaseTestHelper, device_nodeid: int): FailIfNot(test.TestSubscription(nodeid=device_nodeid, endpoint=LIGHTING_ENDPOINT_ID), "Failed to subscribe attributes.") + logger.info("Testing re-subscription") + FailIfNot(asyncio.run(test.TestResubscription(nodeid=device_nodeid)), + "Failed to validated re-subscription") + logger.info("Testing on off cluster over resolved connection") FailIfNot(test.TestOnOffCluster(nodeid=device_nodeid, endpoint=LIGHTING_ENDPOINT_ID, diff --git a/src/controller/tests/TestEventChunking.cpp b/src/controller/tests/TestEventChunking.cpp index 522fcee706aff1..6764bb95e307c3 100644 --- a/src/controller/tests/TestEventChunking.cpp +++ b/src/controller/tests/TestEventChunking.cpp @@ -157,8 +157,6 @@ class TestReadCallback : public app::ReadClient::Callback void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { mOnSubscriptionEstablished = true; } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override {} - uint32_t mAttributeCount = 0; uint32_t mEventCount = 0; bool mOnReportEnd = false; diff --git a/src/controller/tests/TestReadChunking.cpp b/src/controller/tests/TestReadChunking.cpp index 2e50ff19e2041c..5dc6006edc09ba 100644 --- a/src/controller/tests/TestReadChunking.cpp +++ b/src/controller/tests/TestReadChunking.cpp @@ -136,8 +136,6 @@ class TestReadCallback : public app::ReadClient::Callback void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { mOnSubscriptionEstablished = true; } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override {} - uint32_t mAttributeCount = 0; bool mOnReportEnd = false; bool mOnSubscriptionEstablished = false; @@ -306,7 +304,6 @@ class TestMutableReadCallback : public app::ReadClient::Callback void OnReportEnd() override { mOnReportEnd = true; } void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override { mOnSubscriptionEstablished = true; } - void OnResubscriptionAttempt(CHIP_ERROR aTerminationCause, uint32_t aNextResubscribeIntervalMsec) override {} uint32_t mAttributeCount = 0; // We record every dataversion field from every attribute IB. diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp index 6c1f85cf686cf6..0dda7e31483ce4 100644 --- a/src/controller/tests/data_model/TestRead.cpp +++ b/src/controller/tests/data_model/TestRead.cpp @@ -16,6 +16,7 @@ * limitations under the License. */ +#include "system/SystemClock.h" #include "transport/SecureSession.h" #include #include @@ -1472,66 +1473,111 @@ void TestReadInteraction::TestReadAttributeTimeout(nlTestSuite * apSuite, void * NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } -// After client initiated subscription request, test expire session so that subscription fails to establish, and trigger the timeout -// error. Client would automatically try to resubscribe and bump the value for numResubscriptionAttemptedCalls. -void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext) +class TestResubscriptionCallback : public app::ReadClient::Callback { - TestContext & ctx = *static_cast(apContext); - auto sessionHandle = ctx.GetSessionBobToAlice(); - bool onSuccessCbInvoked = false, onFailureCbInvoked = false; - responseDirective = kSendDataError; - uint32_t numSubscriptionEstablishedCalls = 0, numResubscriptionAttemptedCalls = 0; - // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's - // not safe to do so. - auto onSuccessCb = [&onSuccessCbInvoked](const app::ConcreteDataAttributePath & attributePath, const auto & dataResponse) { - onSuccessCbInvoked = true; - }; +public: + TestResubscriptionCallback() {} - // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's - // not safe to do so. - auto onFailureCb = [&onFailureCbInvoked, apSuite](const app::ConcreteDataAttributePath * attributePath, CHIP_ERROR aError) { - NL_TEST_ASSERT(apSuite, aError == CHIP_ERROR_TIMEOUT); - onFailureCbInvoked = true; - }; + void SetReadClient(app::ReadClient * apReadClient) { mpReadClient = apReadClient; } - auto onSubscriptionEstablishedCb = [&numSubscriptionEstablishedCalls](const app::ReadClient & readClient) { - numSubscriptionEstablishedCalls++; - }; + void OnDone(app::ReadClient *) override { mOnDone++; } - auto onSubscriptionAttemptedCb = [&numResubscriptionAttemptedCalls](const app::ReadClient & readClient, CHIP_ERROR aError, - uint32_t aNextResubscribeIntervalMsec) { - numResubscriptionAttemptedCalls++; - }; + void OnError(CHIP_ERROR aError) override + { + mOnError++; + mLastError = aError; + } - Controller::SubscribeAttribute( - &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, 0, 20, onSubscriptionEstablishedCb, - onSubscriptionAttemptedCb, false, true); + void OnSubscriptionEstablished(SubscriptionId aSubscriptionId) override + { + mOnSubscriptionEstablishedCount++; - ctx.ExpireSessionAliceToBob(); + // + // Set the liveness timeout to a super small number that isn't 0 to + // force the liveness timeout to fire. + // + mpReadClient->OverrideLivenessTimeout(System::Clock::Milliseconds32(10)); + } - ctx.DrainAndServiceIO(); + CHIP_ERROR OnResubscriptionNeeded(app::ReadClient * apReadClient, CHIP_ERROR aTerminationCause) override + { + mOnResubscriptionsAttempted++; + return apReadClient->ScheduleResubscription(apReadClient->ComputeTimeTillNextSubscription(), NullOptional, false); + } - NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 1); + void ClearCounters() + { + mOnSubscriptionEstablishedCount = 0; + mOnDone = 0; + mOnError = 0; + mOnResubscriptionsAttempted = 0; + mLastError = CHIP_NO_ERROR; + } - ctx.ExpireSessionBobToAlice(); + int32_t mAttributeCount = 0; + int32_t mOnReportEnd = 0; + int32_t mOnSubscriptionEstablishedCount = 0; + int32_t mOnResubscriptionsAttempted = 0; + int32_t mOnDone = 0; + int32_t mOnError = 0; + CHIP_ERROR mLastError = CHIP_NO_ERROR; + app::ReadClient * mpReadClient = nullptr; +}; - ctx.DrainAndServiceIO(); +// +// This validates the re-subscription logic within ReadClient. This achieves it by overriding the timeout for the liveness +// timer within ReadClient to be a smaller value than the nominal max interval of the subscription. This causes the +// subscription to fail on the client side, triggering re-subscription. +// +// TODO: This does not validate the CASE establishment pathways since we're limited by the PASE-centric TestContext. +// +// +void TestReadInteraction::TestSubscribeAttributeTimeout(nlTestSuite * apSuite, void * apContext) +{ + TestContext & ctx = *static_cast(apContext); + auto sessionHandle = ctx.GetSessionBobToAlice(); - NL_TEST_ASSERT(apSuite, - !onSuccessCbInvoked && !onFailureCbInvoked && numSubscriptionEstablishedCalls == 0 && - numResubscriptionAttemptedCalls == 1); + { + TestResubscriptionCallback callback; + app::ReadClient readClient(app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), callback, + app::ReadClient::InteractionType::Subscribe); - NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); + callback.SetReadClient(&readClient); - NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers() == 0); + app::ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice()); - // - // Let's put back the sessions so that the next tests (which assume a valid initialized set of sessions) - // can function correctly. - // - ctx.CreateSessionAliceToBob(); - ctx.CreateSessionBobToAlice(); + // Read full wildcard paths, repeat twice to ensure chunking. + app::AttributePathParams attributePathParams[1]; + readPrepareParams.mpAttributePathParamsList = attributePathParams; + readPrepareParams.mAttributePathParamsListSize = ArraySize(attributePathParams); + + attributePathParams[0].mClusterId = app::Clusters::TestCluster::Id; + attributePathParams[0].mAttributeId = app::Clusters::TestCluster::Attributes::Boolean::Id; + + readPrepareParams.mMaxIntervalCeilingSeconds = 1; + + readClient.SendAutoResubscribeRequest(std::move(readPrepareParams)); + + // + // Drive servicing IO till we have established a subscription at least 2 times. + // + ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(2), + [&]() { return callback.mOnSubscriptionEstablishedCount > 1; }); + + NL_TEST_ASSERT(apSuite, callback.mOnDone == 0); + // + // With re-sub enabled, we shouldn't encounter any errors. + // + NL_TEST_ASSERT(apSuite, callback.mOnError == 0); + + // + // We should have attempted just one re-subscription. + // + NL_TEST_ASSERT(apSuite, callback.mOnResubscriptionsAttempted == 1); + } + + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } @@ -4270,7 +4316,6 @@ const nlTest sTests[] = NL_TEST_DEF("TestReadHandler_TwoSubscribesMultipleReads", TestReadInteraction::TestReadHandler_TwoSubscribesMultipleReads), NL_TEST_DEF("TestReadHandlerResourceExhaustion_MultipleReads", TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleReads), NL_TEST_DEF("TestReadAttributeTimeout", TestReadInteraction::TestReadAttributeTimeout), - NL_TEST_DEF("TestSubscribeAttributeTimeout", TestReadInteraction::TestSubscribeAttributeTimeout), NL_TEST_DEF("TestReadHandler_SubscriptionReportingIntervalsTest1", TestReadInteraction::TestReadHandler_SubscriptionReportingIntervalsTest1), NL_TEST_DEF("TestReadHandler_SubscriptionReportingIntervalsTest2", TestReadInteraction::TestReadHandler_SubscriptionReportingIntervalsTest2), NL_TEST_DEF("TestReadHandler_SubscriptionReportingIntervalsTest3", TestReadInteraction::TestReadHandler_SubscriptionReportingIntervalsTest3), @@ -4289,6 +4334,7 @@ const nlTest sTests[] = NL_TEST_DEF("TestReadAttribute_ManyDataValues", TestReadInteraction::TestReadAttribute_ManyDataValues), NL_TEST_DEF("TestReadAttribute_ManyDataValuesWrongPath", TestReadInteraction::TestReadAttribute_ManyDataValuesWrongPath), NL_TEST_DEF("TestReadAttribute_ManyErrors", TestReadInteraction::TestReadAttribute_ManyErrors), + NL_TEST_DEF("TestSubscribeAttributeTimeout", TestReadInteraction::TestSubscribeAttributeTimeout), NL_TEST_SENTINEL() }; // clang-format on