diff --git a/src/app/InteractionModelEngine.cpp b/src/app/InteractionModelEngine.cpp index f8b71aa327a6cb..56b412b2655bfe 100644 --- a/src/app/InteractionModelEngine.cpp +++ b/src/app/InteractionModelEngine.cpp @@ -372,25 +372,13 @@ CHIP_ERROR InteractionModelEngine::OnReadInitialRequest(Messaging::ExchangeConte } } - size_t handlerPoolCapacity = mReadHandlers.Capacity(); - #if CONFIG_IM_BUILD_FOR_UNIT_TEST + size_t handlerPoolCapacity = mReadHandlers.Capacity(); if (mReadHandlerCapacityOverride != -1) { handlerPoolCapacity = (size_t) mReadHandlerCapacityOverride; } -#endif - - // Reserve the last ReadHandler for ReadInteraction - if (aInteractionType == ReadHandler::InteractionType::Subscribe && ((handlerPoolCapacity - GetNumActiveReadHandlers()) == 1) && - !HasActiveRead()) - { - ChipLogDetail(InteractionModel, "Reserve the last ReadHandler for IM read Interaction"); - aStatus = Protocols::InteractionModel::Status::ResourceExhausted; - return CHIP_NO_ERROR; - } -#if CONFIG_IM_BUILD_FOR_UNIT_TEST if ((handlerPoolCapacity - GetNumActiveReadHandlers()) == 0) { aStatus = Protocols::InteractionModel::Status::ResourceExhausted; @@ -668,9 +656,27 @@ bool InteractionModelEngine::EnsureResourceForSubscription(FabricIndex aFabricIn const size_t readHandlerCap = allowUnlimited ? SIZE_MAX : static_cast(readHandlerPoolCapacity - kReservedHandlersForReads); - size_t usedAttributePaths = mAttributePathPool.Allocated(); - size_t usedEventPaths = mEventPathPool.Allocated(); - size_t usedReadHandlers = mReadHandlers.Allocated(); + size_t usedAttributePaths = 0; + size_t usedEventPaths = 0; + size_t usedReadHandlers = 0; + + auto countResourceUsage = [&]() { + usedAttributePaths = 0; + usedEventPaths = 0; + usedReadHandlers = 0; + mReadHandlers.ForEachActiveObject([&](auto * handler) { + if (!handler->IsType(ReadHandler::InteractionType::Subscribe)) + { + return Loop::Continue; + } + usedAttributePaths += handler->GetAttributePathCount(); + usedEventPaths += handler->GetEventPathCount(); + usedReadHandlers++; + return Loop::Continue; + }); + }; + + countResourceUsage(); if (usedAttributePaths + aRequestedAttributePathCount <= attributePathCap && usedEventPaths + aRequestedEventPathCount <= eventPathCap && usedReadHandlers < readHandlerCap) @@ -688,10 +694,8 @@ bool InteractionModelEngine::EnsureResourceForSubscription(FabricIndex aFabricIn } const auto evictAndUpdateResourceUsage = [&](FabricIndex fabricIndex, bool forceEvict) { - bool ret = TrimFabric(fabricIndex, forceEvict); - usedAttributePaths = mAttributePathPool.Allocated(); - usedEventPaths = mEventPathPool.Allocated(); - usedReadHandlers = mReadHandlers.Allocated(); + bool ret = TrimFabric(fabricIndex, forceEvict); + countResourceUsage(); return ret; }; @@ -1181,5 +1185,18 @@ uint16_t InteractionModelEngine::GetMinSubscriptionsPerFabric() const return static_cast(perFabricSubscriptionCapacity); } +size_t InteractionModelEngine::GetNumDirtySubscriptions() const +{ + size_t numDirtySubscriptions = 0; + mReadHandlers.ForEachActiveObject([&](const auto readHandler) { + if (readHandler->IsType(ReadHandler::InteractionType::Subscribe) && readHandler->IsDirty()) + { + numDirtySubscriptions++; + } + return Loop::Continue; + }); + return numDirtySubscriptions; +} + } // namespace app } // namespace chip diff --git a/src/app/InteractionModelEngine.h b/src/app/InteractionModelEngine.h index 1bb5c8f6a3ece3..59966967bf5c40 100644 --- a/src/app/InteractionModelEngine.h +++ b/src/app/InteractionModelEngine.h @@ -231,6 +231,11 @@ class InteractionModelEngine : public Messaging::UnsolicitedMessageHandler, */ size_t GetNumActiveReadClients(); + /** + * Returns the number of dirty subscriptions. Including the subscriptions that are generating reports. + */ + size_t GetNumDirtySubscriptions() const; + /** * Returns whether the write operation to the given path is conflict with another write operations. (i.e. another write * transaction is in the middle of processing the chunked value of the given path.) diff --git a/src/controller/tests/data_model/TestRead.cpp b/src/controller/tests/data_model/TestRead.cpp index 66dcadc5f29087..c126b79432fadc 100644 --- a/src/controller/tests/data_model/TestRead.cpp +++ b/src/controller/tests/data_model/TestRead.cpp @@ -39,12 +39,13 @@ using namespace chip::Protocols; namespace { -constexpr EndpointId kTestEndpointId = 1; -constexpr DataVersion kDataVersion = 5; -bool expectedAttribute1 = true; -int16_t expectedAttribute2 = 42; -uint64_t expectedAttribute3 = 0xdeadbeef0000cafe; -uint8_t expectedAttribute4[256] = { +constexpr EndpointId kTestEndpointId = 1; +constexpr DataVersion kDataVersion = 5; +constexpr AttributeId kNeverEndAttributeid = Test::MockAttributeId(1); +bool expectedAttribute1 = true; +int16_t expectedAttribute2 = 42; +uint64_t expectedAttribute3 = 0xdeadbeef0000cafe; +uint8_t expectedAttribute4[256] = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf, @@ -109,6 +110,28 @@ CHIP_ERROR ReadSingleClusterData(const Access::SubjectDescriptor & aSubjectDescr return valueEncoder.Encode(++totalReadCount); } + if (aPath.mClusterId == app::Clusters::TestCluster::Id && aPath.mAttributeId == kNeverEndAttributeid) + { + AttributeValueEncoder::AttributeEncodeState state = AttributeValueEncoder::AttributeEncodeState(); + AttributeValueEncoder valueEncoder(aAttributeReports, aSubjectDescriptor.fabricIndex, aPath, + kDataVersion /* data version */, aIsFabricFiltered, state); + + CHIP_ERROR err = valueEncoder.EncodeList([](const auto & encoder) -> CHIP_ERROR { + encoder.Encode(static_cast(1)); + return CHIP_ERROR_NO_MEMORY; + }); + + if (err != CHIP_NO_ERROR) + { + // If the err is not CHIP_NO_ERROR, means the encoding was aborted, then the valueEncoder may save its state. + // The state is used by list chunking feature for now. + if (apEncoderState != nullptr) + { + *apEncoderState = valueEncoder.GetState(); + } + return err; + } + } AttributeReportIB::Builder & attributeReport = aAttributeReports.CreateAttributeReport(); ReturnErrorOnFailure(aAttributeReports.GetError()); @@ -196,7 +219,6 @@ class TestReadInteraction : public app::ReadHandler::ApplicationCallback static void TestReadHandler_TwoSubscribesMultipleReads(nlTestSuite * apSuite, void * apContext); static void TestReadHandler_MultipleSubscriptionsWithDataVersionFilter(nlTestSuite * apSuite, void * apContext); static void TestReadHandler_SubscriptionAlteredReportingIntervals(nlTestSuite * apSuite, void * apContext); - static void TestReadHandlerResourceExhaustion_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext); static void TestReadHandlerResourceExhaustion_MultipleReads(nlTestSuite * apSuite, void * apContext); static void TestReadSubscribeAttributeResponseWithCache(nlTestSuite * apSuite, void * apContext); static void TestReadHandler_KillOverQuotaSubscriptions(nlTestSuite * apSuite, void * apContext); @@ -1837,63 +1859,6 @@ void TestReadInteraction::TestReadHandler_MultipleSubscriptionsWithDataVersionFi NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); } -void TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleSubscriptions(nlTestSuite * apSuite, void * apContext) -{ - TestContext & ctx = *static_cast(apContext); - auto sessionHandle = ctx.GetSessionBobToAlice(); - uint32_t numSuccessCalls = 0; - uint32_t numFailureCalls = 0; - uint32_t numSubscriptionEstablishedCalls = 0; - - responseDirective = kSendDataResponse; - - // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's - // not safe to do so. - auto onSuccessCb = [&numSuccessCalls](const app::ConcreteDataAttributePath & attributePath, const auto & dataResponse) { - numSuccessCalls++; - }; - - // Passing of stack variables by reference is only safe because of synchronous completion of the interaction. Otherwise, it's - // not safe to do so. - auto onFailureCb = [&apSuite, &numFailureCalls](const app::ConcreteDataAttributePath * attributePath, CHIP_ERROR aError) { - numFailureCalls++; - - NL_TEST_ASSERT(apSuite, aError == CHIP_IM_GLOBAL_STATUS(ResourceExhausted)); - NL_TEST_ASSERT(apSuite, attributePath == nullptr); - }; - - auto onSubscriptionEstablishedCb = [&numSubscriptionEstablishedCalls](const app::ReadClient & readClient) { - numSubscriptionEstablishedCalls++; - }; - - // - // Artifically limit the capacity to 2 ReadHandlers. This will also validate reservation of handlers for Reads, - // since the second subscription below should fail correctly. - // - app::InteractionModelEngine::GetInstance()->SetHandlerCapacity(2); - NL_TEST_ASSERT(apSuite, - Controller::SubscribeAttribute( - &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, 0, 10, - onSubscriptionEstablishedCb, false, true) == CHIP_NO_ERROR); - - NL_TEST_ASSERT(apSuite, - Controller::SubscribeAttribute( - &ctx.GetExchangeManager(), sessionHandle, kTestEndpointId, onSuccessCb, onFailureCb, 0, 10, - onSubscriptionEstablishedCb, false, true) == CHIP_NO_ERROR); - - ctx.DrainAndServiceIO(); - - NL_TEST_ASSERT(apSuite, numSuccessCalls == 1); - NL_TEST_ASSERT(apSuite, numSubscriptionEstablishedCalls == 1); - // Resubscription is happening for second subscribe call - NL_TEST_ASSERT(apSuite, numFailureCalls == 0); - - app::InteractionModelEngine::GetInstance()->SetHandlerCapacity(-1); - app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); - - NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0); -} - void TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleReads(nlTestSuite * apSuite, void * apContext) { TestContext & ctx = *static_cast(apContext); @@ -2082,23 +2047,49 @@ class TestReadCallback : public app::ReadClient::Callback CHIP_ERROR mLastError = CHIP_NO_ERROR; }; -void EstablishSubscriptions(nlTestSuite * apSuite, SessionHandle sessionHandle, int32_t numSubs, int32_t pathPerSub, - app::ReadClient::Callback * callback, std::vector> & readClients) +class TestPerpetualListReadCallback : public app::ReadClient::Callback { - std::vector attributePaths( - pathPerSub, app::AttributePathParams(kTestEndpointId, TestCluster::Id, TestCluster::Attributes::Int16u::Id)); +public: + TestPerpetualListReadCallback() {} + void OnAttributeData(const app::ConcreteDataAttributePath & aPath, TLV::TLVReader * apData, + const app::StatusIB & aStatus) override + { + if (apData != nullptr) + { + reportsReceived++; + app::AttributePathParams path; + path.mEndpointId = aPath.mEndpointId; + path.mClusterId = aPath.mClusterId; + path.mAttributeId = aPath.mAttributeId; + app::InteractionModelEngine::GetInstance()->GetReportingEngine().SetDirty(path); + } + } + + void OnDone(chip::app::ReadClient *) override {} + + int32_t reportsReceived = 0; +}; + +void EstablishReadOrSubscriptions(nlTestSuite * apSuite, SessionHandle sessionHandle, int32_t numSubs, int32_t pathPerSub, + app::AttributePathParams path, app::ReadClient::InteractionType type, + app::ReadClient::Callback * callback, std::vector> & readClients) +{ + std::vector attributePaths(pathPerSub, path); app::ReadPrepareParams readParams(sessionHandle); readParams.mpAttributePathParamsList = attributePaths.data(); readParams.mAttributePathParamsListSize = pathPerSub; - readParams.mMaxIntervalCeilingSeconds = 1; - readParams.mKeepSubscriptions = true; + if (type == app::ReadClient::InteractionType::Subscribe) + { + readParams.mMaxIntervalCeilingSeconds = 1; + readParams.mKeepSubscriptions = true; + } for (int32_t i = 0; i < numSubs; i++) { - std::unique_ptr readClient = std::make_unique( - app::InteractionModelEngine::GetInstance(), app::InteractionModelEngine::GetInstance()->GetExchangeManager(), *callback, - app::ReadClient::InteractionType::Subscribe); + std::unique_ptr readClient = + std::make_unique(app::InteractionModelEngine::GetInstance(), + app::InteractionModelEngine::GetInstance()->GetExchangeManager(), *callback, type); NL_TEST_ASSERT(apSuite, readClient->SendRequest(readParams) == CHIP_NO_ERROR); readClients.push_back(std::move(readClient)); } @@ -2108,6 +2099,7 @@ void EstablishSubscriptions(nlTestSuite * apSuite, SessionHandle sessionHandle, void TestReadInteraction::TestReadHandler_KillOverQuotaSubscriptions(nlTestSuite * apSuite, void * apContext) { + // Note: We cannot use ctx.DrainAndServiceIO() since the perpetual read will make DrainAndServiceIO never return. using namespace SubscriptionPathQuotaHelpers; TestContext & ctx = *static_cast(apContext); auto sessionHandle = ctx.GetSessionBobToAlice(); @@ -2118,21 +2110,54 @@ void TestReadInteraction::TestReadHandler_KillOverQuotaSubscriptions(nlTestSuite app::InteractionModelEngine::GetInstance()->RegisterReadHandlerAppCallback(&gTestReadInteraction); + // Here, we set up two background perpetual read requests to simulate parallel Read + Subscriptions. + // We don't care about the data read, we only care about the existence of such read transactions. TestReadCallback readCallback; TestReadCallback readCallbackFabric2; + TestPerpetualListReadCallback perpetualReadCallback; std::vector> readClients; + EstablishReadOrSubscriptions(apSuite, ctx.GetSessionAliceToBob(), 1, 1, + app::AttributePathParams(kTestEndpointId, TestCluster::Id, kNeverEndAttributeid), + app::ReadClient::InteractionType::Read, &perpetualReadCallback, readClients); + EstablishReadOrSubscriptions(apSuite, ctx.GetSessionBobToAlice(), 1, 1, + app::AttributePathParams(kTestEndpointId, TestCluster::Id, kNeverEndAttributeid), + app::ReadClient::InteractionType::Read, &perpetualReadCallback, readClients); + ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(5), [&]() { + return app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers(app::ReadHandler::InteractionType::Read) == 2; + }); + // Ensure our read transactions are established. + NL_TEST_ASSERT(apSuite, + app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers(app::ReadHandler::InteractionType::Read) == + 2); + // Intentially establish subscriptions using exceeded resources. app::InteractionModelEngine::GetInstance()->SetForceHandlerQuota(false); - EstablishSubscriptions(apSuite, ctx.GetSessionBobToAlice(), 1, - app::InteractionModelEngine::kMinSupportedPathsPerSubscription + 1, &readCallback, readClients); - EstablishSubscriptions(apSuite, ctx.GetSessionBobToAlice(), kExpectedParallelSubs, - app::InteractionModelEngine::kMinSupportedPathsPerSubscription, &readCallback, readClients); + // + // We establish 1 subscription that exceeds the minimum supported paths (but is still established since the + // target has sufficient resources), and kExpectedParallelSubs subscriptions that conform to the minimum + // supported paths. This sets the stage to make it possible to test eviction of subscriptions that are in violation + // of the minimum later below. + // + // Subscription A + EstablishReadOrSubscriptions(apSuite, ctx.GetSessionBobToAlice(), 1, + app::InteractionModelEngine::kMinSupportedPathsPerSubscription + 1, + app::AttributePathParams(kTestEndpointId, TestCluster::Id, TestCluster::Attributes::Int16u::Id), + app::ReadClient::InteractionType::Subscribe, &readCallback, readClients); + // Subscription B + EstablishReadOrSubscriptions(apSuite, ctx.GetSessionBobToAlice(), kExpectedParallelSubs, + app::InteractionModelEngine::kMinSupportedPathsPerSubscription, + app::AttributePathParams(kTestEndpointId, TestCluster::Id, TestCluster::Attributes::Int16u::Id), + app::ReadClient::InteractionType::Subscribe, &readCallback, readClients); // There are too many messages and the test (gcc_debug, which includes many sanity checks) will be quite slow. Note: report // engine is using ScheduleWork which cannot be handled by DrainAndServiceIO correctly. - ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(60), - [&]() { return readCallback.mOnSubscriptionEstablishedCount == kExpectedParallelSubs + 1; }); + ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(5), [&]() { + return readCallback.mOnSubscriptionEstablishedCount == kExpectedParallelSubs + 1 && + readCallback.mAttributeCount == + static_cast(kExpectedParallelSubs * app::InteractionModelEngine::kMinSupportedPathsPerSubscription + + app::InteractionModelEngine::kMinSupportedPathsPerSubscription + 1); + }); NL_TEST_ASSERT(apSuite, readCallback.mAttributeCount == @@ -2141,37 +2166,55 @@ void TestReadInteraction::TestReadHandler_KillOverQuotaSubscriptions(nlTestSuite NL_TEST_ASSERT(apSuite, readCallback.mOnSubscriptionEstablishedCount == kExpectedParallelSubs + 1); // We have set up the environment for testing the evicting logic. + // We now have a full stable of subscriptions setup AND we've artificially limited the capacity, creation of further + // subscriptions will require the eviction of existing subscriptions, OR potential rejection of the subscription if it exceeds + // minimas. app::InteractionModelEngine::GetInstance()->SetForceHandlerQuota(true); app::InteractionModelEngine::GetInstance()->SetHandlerCapacityForSubscriptions(kExpectedParallelSubs); app::InteractionModelEngine::GetInstance()->SetPathPoolCapacityForSubscriptions(kExpectedParallelPaths); - // The following check will trigger the logic in im to kill the read handlers that uses more paths than the limit per fabric. + // Part 1: Test per subscription minimas. + // Rejection of the subscription that exceeds minimas. { TestReadCallback callback; std::vector> outReadClient; - EstablishSubscriptions(apSuite, ctx.GetSessionBobToAlice(), 1, - app::InteractionModelEngine::kMinSupportedPathsPerSubscription + 1, &callback, outReadClient); + EstablishReadOrSubscriptions( + apSuite, ctx.GetSessionBobToAlice(), 1, app::InteractionModelEngine::kMinSupportedPathsPerSubscription + 1, + app::AttributePathParams(kTestEndpointId, TestCluster::Id, TestCluster::Attributes::Int16u::Id), + app::ReadClient::InteractionType::Subscribe, &callback, outReadClient); - ctx.DrainAndServiceIO(); + ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(5), [&]() { return callback.mOnError == 1; }); // Over-sized request after used all paths will receive Paths Exhausted status code. NL_TEST_ASSERT(apSuite, callback.mOnError == 1); NL_TEST_ASSERT(apSuite, callback.mLastError == CHIP_IM_GLOBAL_STATUS(PathsExhausted)); } - // The following check will trigger the logic in im to kill the read handlers that uses more paths than the limit per fabric. + // This next test validates that a compliant subscription request will kick out an existing subscription (arguably, the one that + // was previously established with more paths than the limit per fabric) { - EstablishSubscriptions(apSuite, ctx.GetSessionBobToAlice(), 1, - app::InteractionModelEngine::kMinSupportedPathsPerSubscription, &readCallback, readClients); + EstablishReadOrSubscriptions( + apSuite, ctx.GetSessionBobToAlice(), 1, app::InteractionModelEngine::kMinSupportedPathsPerSubscription, + app::AttributePathParams(kTestEndpointId, TestCluster::Id, TestCluster::Attributes::Int16u::Id), + app::ReadClient::InteractionType::Subscribe, &readCallback, readClients); readCallback.ClearCounters(); - ctx.DrainAndServiceIO(); - - // This read handler should evict some existing subscriptions for enough space + // Run until the new subscription got setup fully as viewed by the client. + ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(5), [&]() { + return readCallback.mOnSubscriptionEstablishedCount == 1 && + readCallback.mAttributeCount == app::InteractionModelEngine::kMinSupportedPathsPerSubscription; + }); + + // This read handler should evict some existing subscriptions for enough space. + // Validate that the new subscription got setup fully as viewed by the client. And we will validate we handled this + // subscription by evicting the correct subscriptions later. NL_TEST_ASSERT(apSuite, readCallback.mOnSubscriptionEstablishedCount == 1); NL_TEST_ASSERT(apSuite, readCallback.mAttributeCount == app::InteractionModelEngine::kMinSupportedPathsPerSubscription); } + // Validate we evicted the right subscription for handling the new subscription above. + // We should used **exactly** all resources for subscriptions if we have evicted the correct subscription, and we validate the + // number of used paths by mark all subscriptions as dirty, and count the number of received reports. { app::AttributePathParams path; path.mEndpointId = kTestEndpointId; @@ -2181,16 +2224,23 @@ void TestReadInteraction::TestReadHandler_KillOverQuotaSubscriptions(nlTestSuite } readCallback.ClearCounters(); - ctx.DrainAndServiceIO(); - // Ensure the global dirty set is cleared. - NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetReportingEngine().GetGlobalDirtySetSize() == 0); - - // We should evict the subscriptions with excess resources, so we should use exactly all resources. + // Run until all subscriptions are clean. + ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(60), + [&]() { return app::InteractionModelEngine::GetInstance()->GetNumDirtySubscriptions() == 0; }); + + // Before the above subscription, we have one subscription with kMinSupportedPathsPerSubscription + 1 paths, we should evict + // that subscription before evicting any other subscriptions, which will result we used exactly kExpectedParallelPaths and have + // exactly kExpectedParallelSubs. + // We have exactly one subscription than uses more resources than others, so the interaction model must evict it first, and we + // will have exactly kExpectedParallelPaths only when that subscription have been evicted. We use this indirect method to verify + // the subscriptions since the read client won't shutdown until the timeout fired. NL_TEST_ASSERT(apSuite, readCallback.mAttributeCount == kExpectedParallelPaths); NL_TEST_ASSERT(apSuite, - app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers() == - static_cast(kExpectedParallelSubs)); + app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers( + app::ReadHandler::InteractionType::Subscribe) == static_cast(kExpectedParallelSubs)); + // Part 2: Testing per fabric minimas. + // Validate we have more than kMinSupportedSubscriptionsPerFabric subscriptions for testing per fabric minimas. NL_TEST_ASSERT(apSuite, app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers( app::ReadHandler::InteractionType::Subscribe, ctx.GetAliceFabricIndex()) > @@ -2198,14 +2248,22 @@ void TestReadInteraction::TestReadHandler_KillOverQuotaSubscriptions(nlTestSuite // The following check will trigger the logic in im to kill the read handlers that use more paths than the limit per fabric. { - EstablishSubscriptions(apSuite, ctx.GetSessionAliceToBob(), - app::InteractionModelEngine::kMinSupportedSubscriptionsPerFabric, - app::InteractionModelEngine::kMinSupportedPathsPerSubscription, &readCallbackFabric2, readClients); - - ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(60), - [&]() { return readCallbackFabric2.mOnSubscriptionEstablishedCount == 1; }); - - // This read handler should evict some existing subscriptions for enough space + EstablishReadOrSubscriptions( + apSuite, ctx.GetSessionAliceToBob(), app::InteractionModelEngine::kMinSupportedSubscriptionsPerFabric, + app::InteractionModelEngine::kMinSupportedPathsPerSubscription, + app::AttributePathParams(kTestEndpointId, TestCluster::Id, TestCluster::Attributes::Int16u::Id), + app::ReadClient::InteractionType::Subscribe, &readCallbackFabric2, readClients); + + // Run until we have established the subscriptions. + ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(5), [&]() { + return readCallbackFabric2.mOnSubscriptionEstablishedCount == + app::InteractionModelEngine::kMinSupportedSubscriptionsPerFabric && + readCallbackFabric2.mAttributeCount == + app::InteractionModelEngine::kMinSupportedPathsPerSubscription * + app::InteractionModelEngine::kMinSupportedSubscriptionsPerFabric; + }); + + // Verify the subscriptions are established successfully. We will check if we evicted the expected subscriptions later. NL_TEST_ASSERT(apSuite, readCallbackFabric2.mOnSubscriptionEstablishedCount == app::InteractionModelEngine::kMinSupportedSubscriptionsPerFabric); @@ -2215,6 +2273,7 @@ void TestReadInteraction::TestReadHandler_KillOverQuotaSubscriptions(nlTestSuite app::InteractionModelEngine::kMinSupportedSubscriptionsPerFabric); } + // Validate the subscriptions are handled by evicting one or more subscriptions from Fabric A. { app::AttributePathParams path; path.mEndpointId = kTestEndpointId; @@ -2225,11 +2284,9 @@ void TestReadInteraction::TestReadHandler_KillOverQuotaSubscriptions(nlTestSuite readCallback.ClearCounters(); readCallbackFabric2.ClearCounters(); - // Run until the global dirtyset is cleared. - ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(60), []() { - return app::InteractionModelEngine::GetInstance()->GetReportingEngine().GetGlobalDirtySetSize() == 0 && - app::InteractionModelEngine::GetInstance()->GetReportingEngine().GetNumReportsInFlight() == 0; - }); + // Run until all subscriptions are clean. + ctx.GetIOContext().DriveIOUntil(System::Clock::Seconds16(60), + [&]() { return app::InteractionModelEngine::GetInstance()->GetNumDirtySubscriptions() == 0; }); // Some subscriptions on fabric 1 should be evicted since fabric 1 is using more resources than the limits. NL_TEST_ASSERT(apSuite, @@ -2249,6 +2306,11 @@ void TestReadInteraction::TestReadHandler_KillOverQuotaSubscriptions(nlTestSuite app::ReadHandler::InteractionType::Subscribe, ctx.GetBobFabricIndex()) == app::InteractionModelEngine::kMinSupportedSubscriptionsPerFabric); + // Ensure our read transactions are still alive. + NL_TEST_ASSERT(apSuite, + app::InteractionModelEngine::GetInstance()->GetNumActiveReadHandlers(app::ReadHandler::InteractionType::Read) == + 2); + app::InteractionModelEngine::GetInstance()->ShutdownActiveReads(); ctx.DrainAndServiceIO(); @@ -2281,8 +2343,10 @@ void TestReadInteraction::TestReadHandler_KillOldestSubscriptions(nlTestSuite * app::InteractionModelEngine::GetInstance()->SetPathPoolCapacityForSubscriptions(kExpectedParallelPaths); // This should just use all availbale resources. - EstablishSubscriptions(apSuite, ctx.GetSessionBobToAlice(), kExpectedParallelSubs, - app::InteractionModelEngine::kMinSupportedPathsPerSubscription, &readCallback, readClients); + EstablishReadOrSubscriptions(apSuite, ctx.GetSessionBobToAlice(), kExpectedParallelSubs, + app::InteractionModelEngine::kMinSupportedPathsPerSubscription, + app::AttributePathParams(kTestEndpointId, TestCluster::Id, TestCluster::Attributes::Int16u::Id), + app::ReadClient::InteractionType::Subscribe, &readCallback, readClients); ctx.DrainAndServiceIO(); @@ -2299,8 +2363,10 @@ void TestReadInteraction::TestReadHandler_KillOldestSubscriptions(nlTestSuite * { TestReadCallback callback; std::vector> outReadClient; - EstablishSubscriptions(apSuite, ctx.GetSessionBobToAlice(), 1, - app::InteractionModelEngine::kMinSupportedPathsPerSubscription + 1, &callback, outReadClient); + EstablishReadOrSubscriptions( + apSuite, ctx.GetSessionBobToAlice(), 1, app::InteractionModelEngine::kMinSupportedPathsPerSubscription + 1, + app::AttributePathParams(kTestEndpointId, TestCluster::Id, TestCluster::Attributes::Int16u::Id), + app::ReadClient::InteractionType::Subscribe, &callback, outReadClient); ctx.DrainAndServiceIO(); @@ -2311,8 +2377,10 @@ void TestReadInteraction::TestReadHandler_KillOldestSubscriptions(nlTestSuite * // The following check will trigger the logic in im to kill the read handlers that uses more paths than the limit per fabric. { - EstablishSubscriptions(apSuite, ctx.GetSessionBobToAlice(), 1, - app::InteractionModelEngine::kMinSupportedPathsPerSubscription, &readCallback, readClients); + EstablishReadOrSubscriptions( + apSuite, ctx.GetSessionBobToAlice(), 1, app::InteractionModelEngine::kMinSupportedPathsPerSubscription, + app::AttributePathParams(kTestEndpointId, TestCluster::Id, TestCluster::Attributes::Int16u::Id), + app::ReadClient::InteractionType::Subscribe, &readCallback, readClients); readCallback.ClearCounters(); ctx.DrainAndServiceIO(); @@ -2526,7 +2594,6 @@ const nlTest sTests[] = NL_TEST_DEF("TestReadHandler_MultipleReads", TestReadInteraction::TestReadHandler_MultipleReads), NL_TEST_DEF("TestReadHandler_OneSubscribeMultipleReads", TestReadInteraction::TestReadHandler_OneSubscribeMultipleReads), NL_TEST_DEF("TestReadHandler_TwoSubscribesMultipleReads", TestReadInteraction::TestReadHandler_TwoSubscribesMultipleReads), - NL_TEST_DEF("TestReadHandlerResourceExhaustion_MultipleSubscriptions", TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleSubscriptions), NL_TEST_DEF("TestReadHandlerResourceExhaustion_MultipleReads", TestReadInteraction::TestReadHandlerResourceExhaustion_MultipleReads), NL_TEST_DEF("TestReadAttributeTimeout", TestReadInteraction::TestReadAttributeTimeout), NL_TEST_DEF("TestReadHandler_SubscriptionAlteredReportingIntervals", TestReadInteraction::TestReadHandler_SubscriptionAlteredReportingIntervals),