diff --git a/src/darwin/Framework/CHIP/MTRBaseDevice.mm b/src/darwin/Framework/CHIP/MTRBaseDevice.mm index a2d6a22a475126..18952f0efc690d 100644 --- a/src/darwin/Framework/CHIP/MTRBaseDevice.mm +++ b/src/darwin/Framework/CHIP/MTRBaseDevice.mm @@ -251,11 +251,11 @@ - (void)invalidateCASESession class SubscriptionCallback final : public MTRBaseSubscriptionCallback { public: - SubscriptionCallback(dispatch_queue_t queue, DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback, + SubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback, ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionScheduledHandler, MTRSubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler) - : MTRBaseSubscriptionCallback(queue, attributeReportCallback, eventReportCallback, errorCallback, - resubscriptionScheduledHandler, subscriptionEstablishedHandler, onDoneHandler) + : MTRBaseSubscriptionCallback(attributeReportCallback, eventReportCallback, errorCallback, resubscriptionScheduledHandler, + subscriptionEstablishedHandler, onDoneHandler) { } @@ -286,80 +286,120 @@ - (void)subscribeWithQueue:(dispatch_queue_t)queue // Copy params before going async. params = [params copy]; - [self.deviceController - getSessionForNode:self.nodeID - completion:^(ExchangeManager * _Nullable exchangeManager, const Optional & session, - NSError * _Nullable error) { - if (error != nil) { - dispatch_async(queue, ^{ - errorHandler(error); - }); - return; - } - - // Wildcard endpoint, cluster, attribute, event. - auto attributePath = std::make_unique(); - auto eventPath = std::make_unique(); - ReadPrepareParams readParams(session.Value()); - readParams.mMinIntervalFloorSeconds = [params.minInterval unsignedShortValue]; - readParams.mMaxIntervalCeilingSeconds = [params.maxInterval unsignedShortValue]; - readParams.mpAttributePathParamsList = attributePath.get(); - readParams.mAttributePathParamsListSize = 1; - readParams.mpEventPathParamsList = eventPath.get(); - readParams.mEventPathParamsListSize = 1; - readParams.mIsFabricFiltered = params.fabricFiltered; - readParams.mKeepSubscriptions = params.keepPreviousSubscriptions; - - std::unique_ptr callback; - std::unique_ptr readClient; - std::unique_ptr clusterStateCache; - if (clusterStateCacheContainer) { - __weak MTRClusterStateCacheContainer * weakPtr = clusterStateCacheContainer; - callback = std::make_unique(queue, attributeReportHandler, eventReportHandler, - errorHandler, resubscriptionScheduled, subscriptionEstablished, ^{ - MTRClusterStateCacheContainer * container = weakPtr; - if (container) { - container.cppClusterStateCache = nullptr; - } - }); - clusterStateCache = std::make_unique(*callback.get()); - readClient = std::make_unique(InteractionModelEngine::GetInstance(), exchangeManager, - clusterStateCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe); - } else { - callback = std::make_unique(queue, attributeReportHandler, eventReportHandler, - errorHandler, resubscriptionScheduled, subscriptionEstablished, nil); - readClient = std::make_unique(InteractionModelEngine::GetInstance(), exchangeManager, - callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe); - } - - CHIP_ERROR err; - if (!params.autoResubscribe) { - err = readClient->SendRequest(readParams); - } else { - // SendAutoResubscribeRequest cleans up the params, even on failure. - attributePath.release(); - eventPath.release(); - err = readClient->SendAutoResubscribeRequest(std::move(readParams)); - } - - if (err != CHIP_NO_ERROR) { - dispatch_async(queue, ^{ - errorHandler([MTRError errorForCHIPErrorCode:err]); - }); - - return; - } - - if (clusterStateCacheContainer) { - clusterStateCacheContainer.cppClusterStateCache = clusterStateCache.get(); - // ClusterStateCache will be deleted when OnDone is called or an error is encountered as well. - callback->AdoptClusterStateCache(std::move(clusterStateCache)); - } - // Callback and ReadClient will be deleted when OnDone is called or an error is - // encountered. - callback->AdoptReadClient(std::move(readClient)); - callback.release(); - }]; + [self.deviceController getSessionForNode:self.nodeID + completion:^(ExchangeManager * _Nullable exchangeManager, const Optional & session, + NSError * _Nullable error) { + if (error != nil) { + dispatch_async(queue, ^{ + errorHandler(error); + }); + return; + } + + // Wildcard endpoint, cluster, attribute, event. + auto attributePath = std::make_unique(); + auto eventPath = std::make_unique(); + ReadPrepareParams readParams(session.Value()); + readParams.mMinIntervalFloorSeconds = [params.minInterval unsignedShortValue]; + readParams.mMaxIntervalCeilingSeconds = [params.maxInterval unsignedShortValue]; + readParams.mpAttributePathParamsList = attributePath.get(); + readParams.mAttributePathParamsListSize = 1; + readParams.mpEventPathParamsList = eventPath.get(); + readParams.mEventPathParamsListSize = 1; + readParams.mIsFabricFiltered = params.fabricFiltered; + readParams.mKeepSubscriptions = params.keepPreviousSubscriptions; + + std::unique_ptr clusterStateCache; + ReadClient::Callback * callbackForReadClient = nullptr; + OnDoneHandler onDoneHandler = nil; + + if (clusterStateCacheContainer) { + __weak MTRClusterStateCacheContainer * weakPtr = clusterStateCacheContainer; + onDoneHandler = ^{ + // This, like all manipulation of cppClusterStateCache, needs to run on the Matter + // queue. + MTRClusterStateCacheContainer * container = weakPtr; + if (container) { + container.cppClusterStateCache = nullptr; + } + }; + } + + auto callback = std::make_unique( + ^(NSArray * value) { + dispatch_async(queue, ^{ + if (attributeReportHandler != nil) { + attributeReportHandler(value); + } + }); + }, + ^(NSArray * value) { + dispatch_async(queue, ^{ + if (eventReportHandler != nil) { + eventReportHandler(value); + } + }); + }, + ^(NSError * error) { + dispatch_async(queue, ^{ + errorHandler(error); + }); + }, + ^(NSError * error, NSNumber * resubscriptionDelay) { + dispatch_async(queue, ^{ + if (resubscriptionScheduled != nil) { + resubscriptionScheduled(error, resubscriptionDelay); + } + }); + }, + ^(void) { + dispatch_async(queue, ^{ + if (subscriptionEstablished != nil) { + subscriptionEstablished(); + } + }); + }, + onDoneHandler); + + if (clusterStateCacheContainer) { + clusterStateCache = std::make_unique(*callback.get()); + callbackForReadClient = &clusterStateCache->GetBufferedCallback(); + } else { + callbackForReadClient = &callback->GetBufferedCallback(); + } + + auto readClient = std::make_unique(InteractionModelEngine::GetInstance(), + exchangeManager, *callbackForReadClient, ReadClient::InteractionType::Subscribe); + + CHIP_ERROR err; + if (!params.autoResubscribe) { + err = readClient->SendRequest(readParams); + } else { + // SendAutoResubscribeRequest cleans up the params, even on failure. + attributePath.release(); + eventPath.release(); + err = readClient->SendAutoResubscribeRequest(std::move(readParams)); + } + + if (err != CHIP_NO_ERROR) { + dispatch_async(queue, ^{ + errorHandler([MTRError errorForCHIPErrorCode:err]); + }); + + return; + } + + if (clusterStateCacheContainer) { + clusterStateCacheContainer.cppClusterStateCache = clusterStateCache.get(); + // ClusterStateCache will be deleted when OnDone is called or an error is encountered as + // well. + callback->AdoptClusterStateCache(std::move(clusterStateCache)); + } + // Callback and ReadClient will be deleted when OnDone is called or an error is + // encountered. + callback->AdoptReadClient(std::move(readClient)); + callback.release(); + }]; } // Convert TLV data into data-value dictionary as described in MTRDeviceResponseHandler diff --git a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h index bb2348aa814486..69db80607ae15a 100644 --- a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h +++ b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.h @@ -35,7 +35,16 @@ /** * This file defines a base class for subscription callbacks used by * MTRBaseDevice and MTRDevice. This base class handles everything except the - * actual conversion from the incoming data to the desired data. + * actual conversion from the incoming data to the desired data and the dispatch + * of callbacks to the relevant client queues. Its callbacks are called on the + * Matter queue. This allows MTRDevice and MTRBaseDevice to do any necessary + * sync cleanup work before dispatching to the client callbacks on the client + * queue. + * + * After onDoneHandler is invoked, this object will at some point delete itself + * and destroy anything it owns (such as the ReadClient or the + * ClusterStateCache). Consumers should drop references to all the relevant + * objects in that handler. This deletion will happen on the Matter queue. * * The desired data is assumed to be NSObjects that can be stored in NSArray. */ @@ -49,12 +58,10 @@ typedef void (^OnDoneHandler)(void); class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callback { public: - MTRBaseSubscriptionCallback(dispatch_queue_t queue, DataReportCallback attributeReportCallback, - DataReportCallback eventReportCallback, ErrorCallback errorCallback, - MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionCallback, + MTRBaseSubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback, + ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler _Nullable resubscriptionCallback, SubscriptionEstablishedHandler _Nullable subscriptionEstablishedHandler, OnDoneHandler _Nullable onDoneHandler) - : mQueue(queue) - , mAttributeReportCallback(attributeReportCallback) + : mAttributeReportCallback(attributeReportCallback) , mEventReportCallback(eventReportCallback) , mErrorCallback(errorCallback) , mResubscriptionCallback(resubscriptionCallback) @@ -117,10 +124,9 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac NSMutableArray * _Nullable mEventReports = nil; private: - dispatch_queue_t mQueue; DataReportCallback _Nullable mAttributeReportCallback = nil; DataReportCallback _Nullable mEventReportCallback = nil; - // We set mErrorCallback to nil when queueing error reports, so we + // We set mErrorCallback to nil before calling the error callback, so we // make sure to only report one error. ErrorCallback _Nullable mErrorCallback = nil; MTRDeviceResubscriptionScheduledHandler _Nullable mResubscriptionCallback = nil; @@ -138,9 +144,11 @@ class MTRBaseSubscriptionCallback : public chip::app::ClusterStateCache::Callbac // To handle this, enforce the following rules: // // 1) We guarantee that mErrorCallback is only invoked with an error once. - // 2) We ensure that we delete ourselves and the passed in ReadClient only from OnDone or a queued-up - // error callback, but not both, by tracking whether we have a queued-up - // deletion. + // 2) We guarantee that mOnDoneHandler is only invoked once, and always + // invoked before we delete ourselves. + // 3) We ensure that we delete ourselves and the passed in ReadClient only + // from OnDone or from an error callback but not both, by tracking whether + // we have a queued-up deletion. std::unique_ptr mReadClient; std::unique_ptr mClusterStateCache; bool mHaveQueuedDeletion = false; diff --git a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm index 564c6a357d0e2b..abbd8ed9cbdc9f 100644 --- a/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm +++ b/src/darwin/Framework/CHIP/MTRBaseSubscriptionCallback.mm @@ -33,21 +33,17 @@ { __block NSArray * attributeReports = mAttributeReports; mAttributeReports = nil; - __block auto attributeCallback = mAttributeReportCallback; + auto attributeCallback = mAttributeReportCallback; __block NSArray * eventReports = mEventReports; mEventReports = nil; - __block auto eventCallback = mEventReportCallback; + auto eventCallback = mEventReportCallback; if (attributeCallback != nil && attributeReports.count) { - dispatch_async(mQueue, ^{ - attributeCallback(attributeReports); - }); + attributeCallback(attributeReports); } if (eventCallback != nil && eventReports.count) { - dispatch_async(mQueue, ^{ - eventCallback(eventReports); - }); + eventCallback(eventReports); } } @@ -96,7 +92,8 @@ void MTRBaseSubscriptionCallback::OnSubscriptionEstablished(SubscriptionId aSubscriptionId) { if (mSubscriptionEstablishedHandler) { - dispatch_async(mQueue, mSubscriptionEstablishedHandler); + auto subscriptionEstablishedHandler = mSubscriptionEstablishedHandler; + subscriptionEstablishedHandler(); } } @@ -109,9 +106,7 @@ auto callback = mResubscriptionCallback; auto error = [MTRError errorForCHIPErrorCode:aTerminationCause]; auto delayMs = @(apReadClient->ComputeTimeTillNextSubscription()); - dispatch_async(mQueue, ^{ - callback(error, delayMs); - }); + callback(error, delayMs); } return CHIP_NO_ERROR; } @@ -129,30 +124,21 @@ return; } - __block ErrorCallback callback = mErrorCallback; __block auto * myself = this; + + auto errorCallback = mErrorCallback; mErrorCallback = nil; mAttributeReportCallback = nil; mEventReportCallback = nil; - __auto_type onDoneHandler = mOnDoneHandler; + auto onDoneHandler = mOnDoneHandler; mOnDoneHandler = nil; - dispatch_async(mQueue, ^{ - callback(err); - }); + errorCallback(err); if (onDoneHandler) { - // To guarantee the async onDoneHandler call is made before - // deletion, so that clean up can happen while the callback - // object is still alive (and therefore cluster cache), queue - // deletion after calling the onDoneHandler - mHaveQueuedDeletion = true; - dispatch_async(mQueue, ^{ - onDoneHandler(); - dispatch_async(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), ^{ - delete myself; - }); - }); - } else if (aCancelSubscription) { + onDoneHandler(); + } + + if (aCancelSubscription) { // We can't synchronously delete ourselves, because we're inside one of // the ReadClient callbacks and we need to outlive the callback's // execution. Queue an async deletion on the Matter queue (where we are diff --git a/src/darwin/Framework/CHIP/MTRDevice.mm b/src/darwin/Framework/CHIP/MTRDevice.mm index 1bd8fbf362faf0..4d29ef5fbb9fe7 100644 --- a/src/darwin/Framework/CHIP/MTRDevice.mm +++ b/src/darwin/Framework/CHIP/MTRDevice.mm @@ -101,10 +101,10 @@ - (id)strongObject class SubscriptionCallback final : public MTRBaseSubscriptionCallback { public: - SubscriptionCallback(dispatch_queue_t queue, DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback, + SubscriptionCallback(DataReportCallback attributeReportCallback, DataReportCallback eventReportCallback, ErrorCallback errorCallback, MTRDeviceResubscriptionScheduledHandler resubscriptionCallback, SubscriptionEstablishedHandler subscriptionEstablishedHandler, OnDoneHandler onDoneHandler) - : MTRBaseSubscriptionCallback(queue, attributeReportCallback, eventReportCallback, errorCallback, resubscriptionCallback, + : MTRBaseSubscriptionCallback(attributeReportCallback, eventReportCallback, errorCallback, resubscriptionCallback, subscriptionEstablishedHandler, onDoneHandler) { } @@ -314,30 +314,41 @@ - (void)subscribeWithMinInterval:(uint16_t)minInterval maxInterval:(uint16_t)max std::unique_ptr readClient; std::unique_ptr clusterStateCache; callback = std::make_unique( - self.queue, ^(NSArray * value) { - // OnAttributeData (after OnReportEnd) - [self _handleAttributeReport:value]; + dispatch_async(self.queue, ^{ + // OnAttributeData (after OnReportEnd) + [self _handleAttributeReport:value]; + }); }, ^(NSArray * value) { - // OnEventReport (after OnReportEnd) - [self _handleEventReport:value]; + dispatch_async(self.queue, ^{ + // OnEventReport (after OnReportEnd) + [self _handleEventReport:value]; + }); }, ^(NSError * error) { - // OnError - [self _handleSubscriptionError:error]; + dispatch_async(self.queue, ^{ + // OnError + [self _handleSubscriptionError:error]; + }); }, ^(NSError * error, NSNumber * resubscriptionDelay) { - // OnResubscriptionNeeded - [self _handleResubscriptionNeeded]; + dispatch_async(self.queue, ^{ + // OnResubscriptionNeeded + [self _handleResubscriptionNeeded]; + }); }, ^(void) { - // OnSubscriptionEstablished - [self _handleSubscriptionEstablished]; + dispatch_async(self.queue, ^{ + // OnSubscriptionEstablished + [self _handleSubscriptionEstablished]; + }); }, ^(void) { - // OnDone - [self _handleSubscriptionReset]; + dispatch_async(self.queue, ^{ + // OnDone + [self _handleSubscriptionReset]; + }); }); readClient = std::make_unique(InteractionModelEngine::GetInstance(), exchangeManager, callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe); diff --git a/src/darwin/Framework/CHIPTests/MTRDeviceTests.m b/src/darwin/Framework/CHIPTests/MTRDeviceTests.m index bf9699b7e17964..71baaee0308a8f 100644 --- a/src/darwin/Framework/CHIPTests/MTRDeviceTests.m +++ b/src/darwin/Framework/CHIPTests/MTRDeviceTests.m @@ -52,6 +52,9 @@ // Singleton controller we use. static MTRDeviceController * sController = nil; +// Keys we can use to restart the controller. +static MTRTestKeys * sTestKeys = nil; + static void WaitForCommissionee(XCTestExpectation * expectation) { MTRDeviceController * controller = sController; @@ -155,6 +158,10 @@ - (void)initStack __auto_type * testKeys = [[MTRTestKeys alloc] init]; XCTAssertNotNil(testKeys); + sTestKeys = testKeys; + + // Needs to match what createControllerOnExistingFabric calls elsewhere in + // this file do. __auto_type * params = [[MTRDeviceControllerStartupParams alloc] initWithIPK:testKeys.ipk fabricID:@(1) nocSigner:testKeys]; params.vendorID = @(kTestVendorId); @@ -1181,6 +1188,131 @@ - (void)test014_InvokeCommandWithDifferentIdResponse [self waitForExpectationsWithTimeout:kTimeoutInSeconds handler:nil]; } +- (void)test015_FailedSubscribeWithQueueAcrossShutdown +{ +#if MANUAL_INDIVIDUAL_TEST + [self initStack]; + [self waitForCommissionee]; +#endif + + MTRBaseDevice * device = GetConnectedDevice(); + dispatch_queue_t queue = dispatch_get_main_queue(); + + MTRDeviceController * controller = sController; + XCTAssertNotNil(controller); + XCTestExpectation * firstSubscribeExpectation = [self expectationWithDescription:@"First subscription complete"]; + XCTestExpectation * errorExpectation = [self expectationWithDescription:@"First subscription errored out"]; + + // Create first subscription. It needs to be using subscribeWithQueue and + // must have a clusterStateCacheContainer to exercise the onDone case. + NSLog(@"Subscribing..."); + __auto_type clusterStateCacheContainer = [[MTRClusterStateCacheContainer alloc] init]; + __auto_type * params = [[MTRSubscribeParams alloc] initWithMinInterval:@(1) maxInterval:@(2)]; + params.autoResubscribe = NO; + [device subscribeWithQueue:queue + params:params + clusterStateCacheContainer:clusterStateCacheContainer + attributeReportHandler:nil + eventReportHandler:nil + errorHandler:^(NSError * error) { + NSLog(@"Received report error: %@", error); + + // Restart the controller here, to exercise our various event queue bits. + [controller shutdown]; + + // Wait a bit before restart, to allow whatever async things are going on after this is called to try to happen. + dispatch_after(dispatch_time(DISPATCH_TIME_NOW, 2 * NSEC_PER_SEC), queue, ^{ + __auto_type * factory = [MTRDeviceControllerFactory sharedInstance]; + XCTAssertNotNil(factory); + + // Needs to match what initStack does. + __auto_type * params = [[MTRDeviceControllerStartupParams alloc] initWithIPK:sTestKeys.ipk + fabricID:@(1) + nocSigner:sTestKeys]; + __auto_type * newController = [factory createControllerOnExistingFabric:params error:nil]; + XCTAssertNotNil(newController); + + sController = newController; + mConnectedDevice = [MTRBaseDevice deviceWithNodeID:@(kDeviceId) controller:newController]; + [errorExpectation fulfill]; + }); + } + subscriptionEstablished:^() { + [firstSubscribeExpectation fulfill]; + } + resubscriptionScheduled:nil]; + [self waitForExpectations:@[ firstSubscribeExpectation ] timeout:60]; + + // Create second subscription which will cancel the first subscription. We + // can use a non-existent path here to cut down on the work that gets done. + [device subscribeAttributePathWithEndpointID:@10000 + clusterID:@6 + attributeID:@0 + params:params + queue:queue + reportHandler:^(id _Nullable values, NSError * _Nullable error) { + } + subscriptionEstablished:^ { + }]; + [self waitForExpectations:@[ errorExpectation ] timeout:60]; +} + +- (void)test016_FailedSubscribeWithCacheReadDuringFailure +{ +#if MANUAL_INDIVIDUAL_TEST + [self initStack]; + [self waitForCommissionee]; +#endif + + MTRBaseDevice * device = GetConnectedDevice(); + dispatch_queue_t queue = dispatch_get_main_queue(); + + MTRDeviceController * controller = sController; + XCTAssertNotNil(controller); + XCTestExpectation * firstSubscribeExpectation = [self expectationWithDescription:@"First subscription complete"]; + XCTestExpectation * errorExpectation = [self expectationWithDescription:@"First subscription errored out"]; + + // Create first subscription. It needs to be using subscribeWithQueue and + // must have a clusterStateCacheContainer to exercise the onDone case. + NSLog(@"Subscribing..."); + __auto_type clusterStateCacheContainer = [[MTRClusterStateCacheContainer alloc] init]; + __auto_type * params = [[MTRSubscribeParams alloc] initWithMinInterval:@(1) maxInterval:@(2)]; + params.autoResubscribe = NO; + [device subscribeWithQueue:queue + params:params + clusterStateCacheContainer:clusterStateCacheContainer + attributeReportHandler:nil + eventReportHandler:nil + errorHandler:^(NSError * error) { + NSLog(@"Received report error: %@", error); + + [MTRBaseClusterOnOff readAttributeOnOffWithClusterStateCache:clusterStateCacheContainer + endpoint:@1 + queue:queue + completion:^(NSNumber * _Nullable value, NSError * _Nullable error) { + [errorExpectation fulfill]; + }]; + } + subscriptionEstablished:^() { + [firstSubscribeExpectation fulfill]; + } + resubscriptionScheduled:nil]; + [self waitForExpectations:@[ firstSubscribeExpectation ] timeout:60]; + + // Create second subscription which will cancel the first subscription. We + // can use a non-existent path here to cut down on the work that gets done. + [device subscribeAttributePathWithEndpointID:@10000 + clusterID:@6 + attributeID:@0 + params:params + queue:queue + reportHandler:^(id _Nullable values, NSError * _Nullable error) { + } + subscriptionEstablished:^ { + }]; + [self waitForExpectations:@[ errorExpectation ] timeout:60]; +} + - (void)test900_SubscribeAllAttributes { #if MANUAL_INDIVIDUAL_TEST