Skip to content

Commit

Permalink
Add support for caching events to the AttributeCache (#17030)
Browse files Browse the repository at this point in the history
* s/AttributeCache/ClusterStateCache/g in preparation of the event caching getting subsumed

* Add EventCaching support to the ClusterStateCache.

* Review feedback

* Reverted the rename of the Darwin files for now since it was starting to get a bit out of hand

* Apply suggestions from code review

Co-authored-by: Boris Zbarsky <bzbarsky@apple.com>

* Review feedback

Co-authored-by: Boris Zbarsky <bzbarsky@apple.com>
  • Loading branch information
mrjerryjohns and bzbarsky-apple authored Apr 14, 2022
1 parent 579efb3 commit 964adbc
Show file tree
Hide file tree
Showing 15 changed files with 810 additions and 109 deletions.
2 changes: 1 addition & 1 deletion scripts/tools/check_includes_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@
ALLOW: Dict[str, Set[str]] = {

# Not intended for embedded clients (#11705).
'src/app/AttributeCache.h': {'list', 'map', 'set', 'vector'},
'src/app/ClusterStateCache.h': {'list', 'map', 'set', 'vector', 'queue'},
'src/app/BufferedReadCallback.h': {'vector'},

# Itself in DENY.
Expand Down
4 changes: 2 additions & 2 deletions src/app/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ static_library("app") {

sources = [
"AttributeAccessInterface.cpp",
"AttributeCache.cpp",
"AttributeCache.h",
"AttributePathExpandIterator.cpp",
"AttributePathExpandIterator.h",
"AttributePathParams.h",
Expand All @@ -67,6 +65,8 @@ static_library("app") {
"CASESessionManager.h",
"ChunkedWriteCallback.cpp",
"ChunkedWriteCallback.h",
"ClusterStateCache.cpp",
"ClusterStateCache.h",
"CommandHandler.cpp",
"CommandResponseHelper.h",
"CommandSender.cpp",
Expand Down
198 changes: 150 additions & 48 deletions src/app/AttributeCache.cpp → src/app/ClusterStateCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
*/

#include "system/SystemPacketBuffer.h"
#include <app/AttributeCache.h>
#include <app/ClusterStateCache.h>
#include <app/InteractionModelEngine.h>
#include <tuple>

namespace chip {
namespace app {

CHIP_ERROR AttributeCache::UpdateCache(const ConcreteDataAttributePath & aPath, TLV::TLVReader * apData, const StatusIB & aStatus)
CHIP_ERROR ClusterStateCache::UpdateCache(const ConcreteDataAttributePath & aPath, TLV::TLVReader * apData,
const StatusIB & aStatus)
{
AttributeState state;
System::PacketBufferHandle handle;
Expand Down Expand Up @@ -85,6 +86,7 @@ CHIP_ERROR AttributeCache::UpdateCache(const ConcreteDataAttributePath & aPath,
{
mCache[aPath.mEndpointId][aPath.mClusterId].mPendingDataVersion = aPath.mDataVersion;
}

mLastReportDataPath = aPath;
}
else
Expand All @@ -106,15 +108,57 @@ CHIP_ERROR AttributeCache::UpdateCache(const ConcreteDataAttributePath & aPath,
return CHIP_NO_ERROR;
}

void AttributeCache::OnReportBegin()
CHIP_ERROR ClusterStateCache::UpdateEventCache(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus)
{
if (apData)
{
//
// If we've already seen this event before, there's no more work to be done.
//
if (mHighestReceivedEventNumber.HasValue() && aEventHeader.mEventNumber <= mHighestReceivedEventNumber.Value())
{
return CHIP_NO_ERROR;
}

System::PacketBufferHandle handle = System::PacketBufferHandle::New(chip::app::kMaxSecureSduLengthBytes);

System::PacketBufferTLVWriter writer;
writer.Init(std::move(handle), false);

ReturnErrorOnFailure(writer.CopyElement(TLV::AnonymousTag(), *apData));
ReturnErrorOnFailure(writer.Finalize(&handle));

//
// Compact the buffer down to a more reasonably sized packet buffer
// if we can.
//
handle.RightSize();

EventData eventData;
eventData.first = aEventHeader;
eventData.second = std::move(handle);

mEventDataCache.insert(std::move(eventData));

mHighestReceivedEventNumber.SetValue(aEventHeader.mEventNumber);
}
else if (apStatus)
{
mEventStatusCache[aEventHeader.mPath] = *apStatus;
}

return CHIP_NO_ERROR;
}

void ClusterStateCache::OnReportBegin()
{
mLastReportDataPath = ConcreteClusterPath(kInvalidEndpointId, kInvalidClusterId);
mChangedAttributeSet.clear();
mAddedEndpoints.clear();
mCallback.OnReportBegin();
}

void AttributeCache::CommitPendingDataVersion()
void ClusterStateCache::CommitPendingDataVersion()
{
if (!mLastReportDataPath.IsValidConcreteClusterPath())
{
Expand All @@ -129,7 +173,7 @@ void AttributeCache::CommitPendingDataVersion()
}
}

void AttributeCache::OnReportEnd()
void ClusterStateCache::OnReportEnd()
{
CommitPendingDataVersion();
mLastReportDataPath = ConcreteClusterPath(kInvalidEndpointId, kInvalidClusterId);
Expand Down Expand Up @@ -158,37 +202,7 @@ void AttributeCache::OnReportEnd()
mCallback.OnReportEnd();
}

void AttributeCache::OnAttributeData(const ConcreteDataAttributePath & aPath, TLV::TLVReader * apData, const StatusIB & aStatus)
{
//
// Since the cache itself is a ReadClient::Callback, it may be incorrectly passed in directly when registering with the
// ReadClient. This should be avoided, since that bypasses the built-in buffered reader adapter callback that is needed for
// lists to work correctly.
//
// Instead, the right callback should be retrieved using GetBufferedCallback().
//
// To catch such errors, we validate that the provided concrete path never indicates a raw list item operation (which the
// buffered reader will handle and convert for us).
//
//
VerifyOrDie(!aPath.IsListItemOperation());

// Copy the reader for forwarding
TLV::TLVReader dataSnapshot;
if (apData)
{
dataSnapshot.Init(*apData);
}

UpdateCache(aPath, apData, aStatus);

//
// Forward the call through.
//
mCallback.OnAttributeData(aPath, apData ? &dataSnapshot : nullptr, aStatus);
}

CHIP_ERROR AttributeCache::Get(const ConcreteAttributePath & path, TLV::TLVReader & reader)
CHIP_ERROR ClusterStateCache::Get(const ConcreteAttributePath & path, TLV::TLVReader & reader)
{
CHIP_ERROR err;

Expand All @@ -209,16 +223,23 @@ CHIP_ERROR AttributeCache::Get(const ConcreteAttributePath & path, TLV::TLVReade
return CHIP_NO_ERROR;
}

CHIP_ERROR AttributeCache::GetVersion(EndpointId mEndpointId, ClusterId mClusterId, Optional<DataVersion> & aVersion)
CHIP_ERROR ClusterStateCache::Get(EventNumber eventNumber, TLV::TLVReader & reader)
{
CHIP_ERROR err;
auto clusterState = GetClusterState(mEndpointId, mClusterId, err);

auto eventData = GetEventData(eventNumber, err);
ReturnErrorOnFailure(err);
aVersion = clusterState->mCommittedDataVersion;

System::PacketBufferTLVReader bufReader;

bufReader.Init(eventData->second.Retain());
ReturnErrorOnFailure(bufReader.Next());

reader.Init(bufReader);
return CHIP_NO_ERROR;
}

AttributeCache::EndpointState * AttributeCache::GetEndpointState(EndpointId endpointId, CHIP_ERROR & err)
ClusterStateCache::EndpointState * ClusterStateCache::GetEndpointState(EndpointId endpointId, CHIP_ERROR & err)
{
auto endpointIter = mCache.find(endpointId);
if (endpointIter == mCache.end())
Expand All @@ -231,7 +252,7 @@ AttributeCache::EndpointState * AttributeCache::GetEndpointState(EndpointId endp
return &endpointIter->second;
}

AttributeCache::ClusterState * AttributeCache::GetClusterState(EndpointId endpointId, ClusterId clusterId, CHIP_ERROR & err)
ClusterStateCache::ClusterState * ClusterStateCache::GetClusterState(EndpointId endpointId, ClusterId clusterId, CHIP_ERROR & err)
{
auto endpointState = GetEndpointState(endpointId, err);
if (err != CHIP_NO_ERROR)
Expand All @@ -250,8 +271,8 @@ AttributeCache::ClusterState * AttributeCache::GetClusterState(EndpointId endpoi
return &clusterState->second;
}

AttributeCache::AttributeState * AttributeCache::GetAttributeState(EndpointId endpointId, ClusterId clusterId,
AttributeId attributeId, CHIP_ERROR & err)
const ClusterStateCache::AttributeState * ClusterStateCache::GetAttributeState(EndpointId endpointId, ClusterId clusterId,
AttributeId attributeId, CHIP_ERROR & err)
{
auto clusterState = GetClusterState(endpointId, clusterId, err);
if (err != CHIP_NO_ERROR)
Expand All @@ -270,7 +291,76 @@ AttributeCache::AttributeState * AttributeCache::GetAttributeState(EndpointId en
return &attributeState->second;
}

CHIP_ERROR AttributeCache::GetStatus(const ConcreteAttributePath & path, StatusIB & status)
const ClusterStateCache::EventData * ClusterStateCache::GetEventData(EventNumber eventNumber, CHIP_ERROR & err)
{
EventData compareKey;

compareKey.first.mEventNumber = eventNumber;
auto eventData = mEventDataCache.find(std::move(compareKey));
if (eventData == mEventDataCache.end())
{
err = CHIP_ERROR_KEY_NOT_FOUND;
return nullptr;
}

err = CHIP_NO_ERROR;
return &(*eventData);
}

void ClusterStateCache::OnAttributeData(const ConcreteDataAttributePath & aPath, TLV::TLVReader * apData, const StatusIB & aStatus)
{
//
// Since the cache itself is a ReadClient::Callback, it may be incorrectly passed in directly when registering with the
// ReadClient. This should be avoided, since that bypasses the built-in buffered reader adapter callback that is needed for
// lists to work correctly.
//
// Instead, the right callback should be retrieved using GetBufferedCallback().
//
// To catch such errors, we validate that the provided concrete path never indicates a raw list item operation (which the
// buffered reader will handle and convert for us).
//
//
VerifyOrDie(!aPath.IsListItemOperation());

// Copy the reader for forwarding
TLV::TLVReader dataSnapshot;
if (apData)
{
dataSnapshot.Init(*apData);
}

UpdateCache(aPath, apData, aStatus);

//
// Forward the call through.
//
mCallback.OnAttributeData(aPath, apData ? &dataSnapshot : nullptr, aStatus);
}

CHIP_ERROR ClusterStateCache::GetVersion(EndpointId mEndpointId, ClusterId mClusterId, Optional<DataVersion> & aVersion)
{
CHIP_ERROR err;
auto clusterState = GetClusterState(mEndpointId, mClusterId, err);
ReturnErrorOnFailure(err);
aVersion = clusterState->mCommittedDataVersion;
return CHIP_NO_ERROR;
}

void ClusterStateCache::OnEventData(const EventHeader & aEventHeader, TLV::TLVReader * apData, const StatusIB * apStatus)
{
VerifyOrDie(apData != nullptr || apStatus != nullptr);

TLV::TLVReader dataSnapshot;
if (apData)
{
dataSnapshot.Init(*apData);
}

UpdateEventCache(aEventHeader, apData, apStatus);
mCallback.OnEventData(aEventHeader, apData ? &dataSnapshot : nullptr, apStatus);
}

CHIP_ERROR ClusterStateCache::GetStatus(const ConcreteAttributePath & path, StatusIB & status)
{
CHIP_ERROR err;

Expand All @@ -286,7 +376,19 @@ CHIP_ERROR AttributeCache::GetStatus(const ConcreteAttributePath & path, StatusI
return CHIP_NO_ERROR;
}

void AttributeCache::GetSortedFilters(std::vector<std::pair<DataVersionFilter, size_t>> & aVector)
CHIP_ERROR ClusterStateCache::GetStatus(const ConcreteEventPath & path, StatusIB & status)
{
auto statusIter = mEventStatusCache.find(path);
if (statusIter == mEventStatusCache.end())
{
return CHIP_ERROR_KEY_NOT_FOUND;
}

status = statusIter->second;
return CHIP_NO_ERROR;
}

void ClusterStateCache::GetSortedFilters(std::vector<std::pair<DataVersionFilter, size_t>> & aVector)
{
for (auto const & endpointIter : mCache)
{
Expand Down Expand Up @@ -342,9 +444,9 @@ void AttributeCache::GetSortedFilters(std::vector<std::pair<DataVersionFilter, s
});
}

CHIP_ERROR AttributeCache::OnUpdateDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
const Span<AttributePathParams> & aAttributePaths,
bool & aEncodedDataVersionList)
CHIP_ERROR ClusterStateCache::OnUpdateDataVersionFilterList(DataVersionFilterIBs::Builder & aDataVersionFilterIBsBuilder,
const Span<AttributePathParams> & aAttributePaths,
bool & aEncodedDataVersionList)
{
CHIP_ERROR err = CHIP_NO_ERROR;
TLV::TLVWriter backup;
Expand Down
Loading

0 comments on commit 964adbc

Please sign in to comment.