Skip to content

Commit

Permalink
Fix some threading issues on Darwin. (#20197)
Browse files Browse the repository at this point in the history
* Fix some threading issues on Darwin.

There were two places where we were touching SDK data structures from
the wrong event queue.

* Address review comments
  • Loading branch information
bzbarsky-apple authored Jul 1, 2022
1 parent cccd145 commit 6fd06ea
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,11 @@ class TestCommandBridge : public CHIPCommandBridge,

SetIdentity(identity);

// Disconnect our existing device; otherwise getConnectedDevice will
// just hand it right back to us without establishing a new CASE
// Invalidate our existing CASE session; otherwise getConnectedDevice
// will just hand it right back to us without establishing a new CASE
// session.
if (GetDevice(identity) != nil) {
auto device = [GetDevice(identity) internalDevice];
if (device != nullptr) {
device->Disconnect();
}
[GetDevice(identity) invalidateCASESession];
mConnectedDevices[identity] = nil;
}

Expand Down
142 changes: 77 additions & 65 deletions src/darwin/Framework/CHIP/CHIPDevice.mm
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ - (instancetype)initWithDevice:(chip::DeviceProxy *)device
return _cppDevice;
}

- (void)invalidateCASESession
{
dispatch_sync(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), ^{
DeviceProxy * device = [self internalDevice];
if (device != nullptr) {
device->Disconnect();
}
});
}

typedef void (^ReportCallback)(NSArray * _Nullable value, NSError * _Nullable error);
typedef void (^DataReportCallback)(NSArray * value);
typedef void (^ErrorCallback)(NSError * error);
Expand Down Expand Up @@ -371,76 +381,78 @@ - (void)subscribeWithQueue:(dispatch_queue_t)queue
errorHandler:(void (^)(NSError * error))errorHandler
subscriptionEstablished:(nullable void (^)(void))subscriptionEstablishedHandler
{
DeviceProxy * device = [self internalDevice];
if (!device) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:CHIP_ERROR_INCORRECT_STATE]);
});
return;
}

// Wildcard endpoint, cluster, attribute, event.
auto attributePath = std::make_unique<AttributePathParams>();
auto eventPath = std::make_unique<EventPathParams>();
ReadPrepareParams readParams(device->GetSecureSession().Value());
readParams.mMinIntervalFloorSeconds = minInterval;
readParams.mMaxIntervalCeilingSeconds = maxInterval;
readParams.mpAttributePathParamsList = attributePath.get();
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = eventPath.get();
readParams.mEventPathParamsListSize = 1;
readParams.mKeepSubscriptions
= (params != nil) && (params.keepPreviousSubscriptions != nil) && [params.keepPreviousSubscriptions boolValue];

std::unique_ptr<SubscriptionCallback> callback;
std::unique_ptr<ReadClient> readClient;
std::unique_ptr<ClusterStateCache> attributeCache;
if (attributeCacheContainer) {
__weak CHIPAttributeCacheContainer * weakPtr = attributeCacheContainer;
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler, ^{
CHIPAttributeCacheContainer * container = weakPtr;
if (container) {
container.cppAttributeCache = nullptr;
}
dispatch_async(DeviceLayer::PlatformMgrImpl().GetWorkQueue(), ^{
DeviceProxy * device = [self internalDevice];
if (!device) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:CHIP_ERROR_INCORRECT_STATE]);
});
attributeCache = std::make_unique<ClusterStateCache>(*callback.get());
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
attributeCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
} else {
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler);
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
}
return;
}

CHIP_ERROR err;
if (params != nil && params.autoResubscribe != nil && ![params.autoResubscribe boolValue]) {
err = readClient->SendRequest(readParams);
} else {
// SendAutoResubscribeRequest cleans up the params, even on failure.
attributePath.release();
eventPath.release();
err = readClient->SendAutoResubscribeRequest(std::move(readParams));
}
// Wildcard endpoint, cluster, attribute, event.
auto attributePath = std::make_unique<AttributePathParams>();
auto eventPath = std::make_unique<EventPathParams>();
ReadPrepareParams readParams(device->GetSecureSession().Value());
readParams.mMinIntervalFloorSeconds = minInterval;
readParams.mMaxIntervalCeilingSeconds = maxInterval;
readParams.mpAttributePathParamsList = attributePath.get();
readParams.mAttributePathParamsListSize = 1;
readParams.mpEventPathParamsList = eventPath.get();
readParams.mEventPathParamsListSize = 1;
readParams.mKeepSubscriptions
= (params != nil) && (params.keepPreviousSubscriptions != nil) && [params.keepPreviousSubscriptions boolValue];

std::unique_ptr<SubscriptionCallback> callback;
std::unique_ptr<ReadClient> readClient;
std::unique_ptr<ClusterStateCache> attributeCache;
if (attributeCacheContainer) {
__weak CHIPAttributeCacheContainer * weakPtr = attributeCacheContainer;
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler, ^{
CHIPAttributeCacheContainer * container = weakPtr;
if (container) {
container.cppAttributeCache = nullptr;
}
});
attributeCache = std::make_unique<ClusterStateCache>(*callback.get());
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
attributeCache->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
} else {
callback = std::make_unique<SubscriptionCallback>(
queue, attributeReportHandler, eventReportHandler, errorHandler, subscriptionEstablishedHandler);
readClient = std::make_unique<ReadClient>(InteractionModelEngine::GetInstance(), device->GetExchangeManager(),
callback->GetBufferedCallback(), ReadClient::InteractionType::Subscribe);
}

if (err != CHIP_NO_ERROR) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:err]);
});
CHIP_ERROR err;
if (params != nil && params.autoResubscribe != nil && ![params.autoResubscribe boolValue]) {
err = readClient->SendRequest(readParams);
} else {
// SendAutoResubscribeRequest cleans up the params, even on failure.
attributePath.release();
eventPath.release();
err = readClient->SendAutoResubscribeRequest(std::move(readParams));
}

return;
}
if (err != CHIP_NO_ERROR) {
dispatch_async(queue, ^{
errorHandler([CHIPError errorForCHIPErrorCode:err]);
});

if (attributeCacheContainer) {
attributeCacheContainer.cppAttributeCache = attributeCache.get();
// ClusterStateCache will be deleted when OnDone is called or an error is encountered as well.
callback->AdoptAttributeCache(std::move(attributeCache));
}
// Callback and ReadClient will be deleted when OnDone is called or an error is
// encountered.
callback->AdoptReadClient(std::move(readClient));
callback.release();
return;
}

if (attributeCacheContainer) {
attributeCacheContainer.cppAttributeCache = attributeCache.get();
// ClusterStateCache will be deleted when OnDone is called or an error is encountered as well.
callback->AdoptAttributeCache(std::move(attributeCache));
}
// Callback and ReadClient will be deleted when OnDone is called or an error is
// encountered.
callback->AdoptReadClient(std::move(readClient));
callback.release();
});
}

// Convert TLV data into NSObject
Expand Down
7 changes: 7 additions & 0 deletions src/darwin/Framework/CHIP/CHIPDevice_Internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ NS_ASSUME_NONNULL_BEGIN
- (instancetype)initWithDevice:(chip::DeviceProxy *)device;
- (chip::DeviceProxy *)internalDevice;

/**
* Invalidate the CASE session, so an attempt to getConnectedDevice for this
* device id will have to create a new CASE session. Ideally this API will go
* away.
*/
- (void)invalidateCASESession;

@end

@interface CHIPAttributePath ()
Expand Down

0 comments on commit 6fd06ea

Please sign in to comment.