Skip to content

Commit

Permalink
ExchangeHolder: A RAII'fied, safer, hands-free construct for EC clean…
Browse files Browse the repository at this point in the history
…-up management (#20237)

* ExchangeHolder

An object for managing clean-up of ExchangeContexts.

* Review feedback

* Fixed up ReadHandler as well
  • Loading branch information
mrjerryjohns authored Jul 21, 2022
1 parent d5fd062 commit e1110c1
Show file tree
Hide file tree
Showing 14 changed files with 697 additions and 179 deletions.
4 changes: 2 additions & 2 deletions src/app/InteractionModelEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ bool InteractionModelEngine::TrimFabricForSubscriptions(FabricIndex aFabricIndex
eventPathsSubscribedByCurrentFabric > perFabricPathCapacity ||
subscriptionsEstablishedByCurrentFabric > perFabricSubscriptionCapacity))
{
candidate->Abort();
candidate->Close();
return true;
}
return false;
Expand Down Expand Up @@ -847,7 +847,7 @@ bool InteractionModelEngine::TrimFabricForRead(FabricIndex aFabricIndex)
// Always evict the transactions on PASE sessions if the fabric table is full.
(aFabricIndex == kUndefinedFabricIndex && mpFabricTable->FabricCount() == GetConfigMaxFabrics())))
{
candidate->Abort();
candidate->Close();
return true;
}
return false;
Expand Down
101 changes: 28 additions & 73 deletions src/app/ReadClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ static void DefaultResubscribePolicy(uint32_t aNumCumulativeRetries, uint32_t &

ReadClient::ReadClient(InteractionModelEngine * apImEngine, Messaging::ExchangeManager * apExchangeMgr, Callback & apCallback,
InteractionType aInteractionType) :
mExchange(*this),
mpCallback(apCallback)
{
// Error if already initialized.
Expand Down Expand Up @@ -110,8 +111,6 @@ void ReadClient::StopResubscription()

ReadClient::~ReadClient()
{
Abort();

if (IsSubscriptionType())
{
CancelLivenessCheckTimer();
Expand All @@ -129,18 +128,6 @@ ReadClient::~ReadClient()

void ReadClient::Close(CHIP_ERROR aError)
{
// OnDone below can destroy us before we unwind all the way back into the
// exchange code and it tries to close itself. Make sure that it doesn't
// try to notify us that it's closing, since we will be dead.
//
// For more details, see #10344.
if (mpExchangeCtx != nullptr)
{
mpExchangeCtx->SetDelegate(nullptr);
}

mpExchangeCtx = nullptr;

if (IsReadType())
{
if (aError != CHIP_NO_ERROR)
Expand Down Expand Up @@ -284,20 +271,22 @@ CHIP_ERROR ReadClient::SendReadRequest(ReadPrepareParams & aReadPrepareParams)

VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);

mpExchangeCtx = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
VerifyOrReturnError(mpExchangeCtx != nullptr, err = CHIP_ERROR_NO_MEMORY);
auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
VerifyOrReturnError(exchange != nullptr, err = CHIP_ERROR_NO_MEMORY);

mExchange.Grab(exchange);

if (aReadPrepareParams.mTimeout == System::Clock::kZero)
{
mpExchangeCtx->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
}
else
{
mpExchangeCtx->SetResponseTimeout(aReadPrepareParams.mTimeout);
mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
}

ReturnErrorOnFailure(mpExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::ReadRequest, std::move(msgBuf),
Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::ReadRequest, std::move(msgBuf),
Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));

mPeerNodeId = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeerNodeId();
mFabricIndex = aReadPrepareParams.mSessionHolder->GetFabricIndex();
Expand Down Expand Up @@ -355,6 +344,7 @@ CHIP_ERROR ReadClient::BuildDataVersionFilterList(DataVersionFilterIBs::Builder
break;
}
}

if (!intersected)
{
continue;
Expand Down Expand Up @@ -405,20 +395,13 @@ CHIP_ERROR ReadClient::OnMessageReceived(Messaging::ExchangeContext * apExchange
}
else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::SubscribeResponse))
{
VerifyOrExit(apExchangeContext == mpExchangeCtx, err = CHIP_ERROR_INCORRECT_STATE);
VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
err = ProcessSubscribeResponse(std::move(aPayload));
SuccessOrExit(err);

//
// Null out the delegate and context as SubscribeResponse is the last message the Subscribe transaction and
// the exchange layer will automatically close the exchange.
//
mpExchangeCtx->SetDelegate(nullptr);
mpExchangeCtx = nullptr;
}
else if (aPayloadHeader.HasMessageType(Protocols::InteractionModel::MsgType::StatusResponse))
{
VerifyOrExit(apExchangeContext == mpExchangeCtx, err = CHIP_ERROR_INCORRECT_STATE);
VerifyOrExit(apExchangeContext == mExchange.Get(), err = CHIP_ERROR_INCORRECT_STATE);
err = StatusResponse::ProcessStatusResponse(std::move(aPayload));
SuccessOrExit(err);
}
Expand All @@ -436,38 +419,10 @@ CHIP_ERROR ReadClient::OnMessageReceived(Messaging::ExchangeContext * apExchange
return err;
}

void ReadClient::Abort()
{
//
// If the exchange context hasn't already been gracefully closed
// (signaled by setting it to null), then we need to forcibly
// tear it down.
//
if (mpExchangeCtx != nullptr)
{
// We might be a delegate for this exchange, and we don't want the
// OnExchangeClosing notification in that case. Null out the delegate
// to avoid that.
//
// TODO: This makes all sorts of assumptions about what the delegate is
// (notice the "might" above!) that might not hold in practice. We
// really need a better solution here....
mpExchangeCtx->SetDelegate(nullptr);
mpExchangeCtx->Abort();
mpExchangeCtx = nullptr;
}
}

CHIP_ERROR ReadClient::OnUnsolicitedReportData(Messaging::ExchangeContext * apExchangeContext,
System::PacketBufferHandle && aPayload)
{
mpExchangeCtx = apExchangeContext;

//
// Let's take over further message processing on this exchange from the IM.
// This is only relevant for reports during post-subscription.
//
mpExchangeCtx->SetDelegate(this);
mExchange.Grab(apExchangeContext);

CHIP_ERROR err = ProcessReportData(std::move(aPayload));
if (err != CHIP_NO_ERROR)
Expand Down Expand Up @@ -591,12 +546,7 @@ CHIP_ERROR ReadClient::ProcessReportData(System::PacketBufferHandle && aPayload)
bool noResponseExpected = IsSubscriptionActive() && !mPendingMoreChunks;
err = StatusResponse::Send(err == CHIP_NO_ERROR ? Protocols::InteractionModel::Status::Success
: Protocols::InteractionModel::Status::InvalidSubscription,
mpExchangeCtx, !noResponseExpected);

if (noResponseExpected || (err != CHIP_NO_ERROR))
{
mpExchangeCtx = nullptr;
}
mExchange.Get(), !noResponseExpected);
}

mIsPrimingReports = false;
Expand Down Expand Up @@ -756,11 +706,14 @@ CHIP_ERROR ReadClient::ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsRea
CHIP_ERROR ReadClient::RefreshLivenessCheckTimer()
{
CHIP_ERROR err = CHIP_NO_ERROR;

CancelLivenessCheckTimer();
VerifyOrReturnError(mpExchangeCtx != nullptr, err = CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(mpExchangeCtx->HasSessionHandle(), err = CHIP_ERROR_INCORRECT_STATE);

System::Clock::Timeout timeout = System::Clock::Seconds16(mMaxInterval) + mpExchangeCtx->GetSessionHandle()->GetAckTimeout();
VerifyOrReturnError(mExchange, CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(mExchange->HasSessionHandle(), CHIP_ERROR_INCORRECT_STATE);

System::Clock::Timeout timeout = System::Clock::Seconds16(mMaxInterval) + mExchange->GetSessionHandle()->GetAckTimeout();

// EFR32/MBED/INFINION/K32W's chrono count return long unsinged, but other platform returns unsigned
ChipLogProgress(DataManagement,
"Refresh LivenessCheckTime for %lu milliseconds with SubscriptionId = 0x%08" PRIx32
Expand Down Expand Up @@ -944,20 +897,22 @@ CHIP_ERROR ReadClient::SendSubscribeRequestImpl(const ReadPrepareParams & aReadP

VerifyOrReturnError(aReadPrepareParams.mSessionHolder, CHIP_ERROR_MISSING_SECURE_SESSION);

mpExchangeCtx = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
VerifyOrReturnError(mpExchangeCtx != nullptr, CHIP_ERROR_NO_MEMORY);
auto exchange = mpExchangeMgr->NewContext(aReadPrepareParams.mSessionHolder.Get().Value(), this);
VerifyOrReturnError(exchange != nullptr, CHIP_ERROR_NO_MEMORY);

mExchange.Grab(exchange);

if (aReadPrepareParams.mTimeout == System::Clock::kZero)
{
mpExchangeCtx->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
mExchange->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
}
else
{
mpExchangeCtx->SetResponseTimeout(aReadPrepareParams.mTimeout);
mExchange->SetResponseTimeout(aReadPrepareParams.mTimeout);
}

ReturnErrorOnFailure(mpExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::SubscribeRequest, std::move(msgBuf),
Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));
ReturnErrorOnFailure(mExchange->SendMessage(Protocols::InteractionModel::MsgType::SubscribeRequest, std::move(msgBuf),
Messaging::SendFlags(Messaging::SendMessageFlags::kExpectResponse)));

mPeerNodeId = aReadPrepareParams.mSessionHolder->AsSecureSession()->GetPeerNodeId();
mFabricIndex = aReadPrepareParams.mSessionHolder->GetFabricIndex();
Expand Down
4 changes: 2 additions & 2 deletions src/app/ReadClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <lib/support/DLLUtil.h>
#include <lib/support/logging/CHIPLogging.h>
#include <messaging/ExchangeContext.h>
#include <messaging/ExchangeHolder.h>
#include <messaging/ExchangeMgr.h>
#include <messaging/Flags.h>
#include <protocols/Protocols.h>
Expand Down Expand Up @@ -368,7 +369,6 @@ class ReadClient : public Messaging::ExchangeDelegate
CHIP_ERROR ProcessAttributeReportIBs(TLV::TLVReader & aAttributeDataIBsReader);
CHIP_ERROR ProcessEventReportIBs(TLV::TLVReader & aEventReportIBsReader);

void ClearExchangeContext() { mpExchangeCtx = nullptr; }
static void OnLivenessTimeoutCallback(System::Layer * apSystemLayer, void * apAppState);
CHIP_ERROR ProcessSubscribeResponse(System::PacketBufferHandle && aPayload);
CHIP_ERROR RefreshLivenessCheckTimer();
Expand Down Expand Up @@ -415,7 +415,7 @@ class ReadClient : public Messaging::ExchangeDelegate
CHIP_ERROR GetMinEventNumber(const ReadPrepareParams & aReadPrepareParams, Optional<EventNumber> & aEventMin);

Messaging::ExchangeManager * mpExchangeMgr = nullptr;
Messaging::ExchangeContext * mpExchangeCtx = nullptr;
Messaging::ExchangeHolder mExchange;
Callback & mpCallback;
ClientState mState = ClientState::Idle;
bool mIsReporting = false;
Expand Down
Loading

0 comments on commit e1110c1

Please sign in to comment.