Skip to content

Commit

Permalink
Move CommandSender/Handler and WriteClient/Handler over to `ExchangeH…
Browse files Browse the repository at this point in the history
…older` (#21081)

* Move CommandSender/Handler and WriteClient/Handler over to using the new
but safer, ExchangeHolder way of EC management.

* Update src/app/CommandHandler.h
  • Loading branch information
mrjerryjohns authored and pull[bot] committed Jul 10, 2023
1 parent 2ed877d commit 1201632
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 270 deletions.
77 changes: 14 additions & 63 deletions src/app/CommandHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
namespace chip {
namespace app {

CommandHandler::CommandHandler(Callback * apCallback) : mpCallback(apCallback), mSuppressResponse(false) {}
CommandHandler::CommandHandler(Callback * apCallback) : mExchangeCtx(*this), mpCallback(apCallback), mSuppressResponse(false) {}

CHIP_ERROR CommandHandler::AllocateBuffer()
{
Expand Down Expand Up @@ -73,15 +73,15 @@ CHIP_ERROR CommandHandler::OnInvokeCommandRequest(Messaging::ExchangeContext * e

// NOTE: we already know this is an InvokeCommand Request message because we explicitly registered with the
// Exchange Manager for unsolicited InvokeCommand Requests.
mpExchangeCtx = ec;
mExchangeCtx.Grab(ec);

// Use the RAII feature, if this is the only Handle when this function returns, DecrementHoldOff will trigger sending response.
// TODO: This is broken! If something under here returns error, we will try
// to SendCommandResponse(), and then our caller will try to send a status
// response too. Figure out at what point it's our responsibility to
// handler errors vs our caller's.
Handle workHandle(this);
mpExchangeCtx->WillSendMessage();
mExchangeCtx->WillSendMessage();
ReturnErrorOnFailure(ProcessInvokeRequest(std::move(payload), isTimedInvoke));

return CHIP_NO_ERROR;
Expand All @@ -103,26 +103,19 @@ CHIP_ERROR CommandHandler::ProcessInvokeRequest(System::PacketBufferHandle && pa
ReturnErrorOnFailure(invokeRequestMessage.GetTimedRequest(&mTimedRequest));
ReturnErrorOnFailure(invokeRequestMessage.GetInvokeRequests(&invokeRequests));

VerifyOrReturnError(mpExchangeCtx != nullptr, CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(mExchangeCtx, CHIP_ERROR_INCORRECT_STATE);

if (mTimedRequest != isTimedInvoke)
{
// The message thinks it should be part of a timed interaction but it's
// not, or vice versa. Spec says to Respond with UNSUPPORTED_ACCESS.
err = StatusResponse::Send(Protocols::InteractionModel::Status::UnsupportedAccess, mpExchangeCtx,
err = StatusResponse::Send(Protocols::InteractionModel::Status::UnsupportedAccess, mExchangeCtx.Get(),
/* aExpectResponse = */ false);

if (err != CHIP_NO_ERROR)
if (err == CHIP_NO_ERROR)
{
// We have to manually close the exchange, because we called
// WillSendMessage already.
mpExchangeCtx->Close();
mSentStatusResponse = true;
}

// Null out the (now-closed) exchange, so that when we try to
// SendCommandResponse() later (when our holdoff count drops to 0) it
// just fails and we don't double-respond.
mpExchangeCtx = nullptr;
return err;
}

Expand All @@ -142,7 +135,7 @@ CHIP_ERROR CommandHandler::ProcessInvokeRequest(System::PacketBufferHandle && pa
CommandDataIB::Parser commandData;
ReturnErrorOnFailure(commandData.Init(invokeRequestsReader));

if (mpExchangeCtx->IsGroupExchangeContext())
if (mExchangeCtx->IsGroupExchangeContext())
{
ReturnErrorOnFailure(ProcessGroupCommandDataIB(commandData));
}
Expand Down Expand Up @@ -172,18 +165,6 @@ void CommandHandler::Close()
VerifyOrDieWithMsg(mPendingWork == 0, DataManagement, "CommandHandler::Close() called with %u unfinished async work items",
static_cast<unsigned int>(mPendingWork));

// OnDone below can destroy us before we unwind all the way back into the
// exchange code and it tries to close itself. Make sure that it doesn't
// try to notify us that it's closing, since we will be dead.
//
// For more details, see #10344.
if (mpExchangeCtx != nullptr)
{
mpExchangeCtx->SetDelegate(nullptr);
}

mpExchangeCtx = nullptr;

if (mpCallback)
{
mpCallback->OnDone(*this);
Expand All @@ -205,21 +186,12 @@ void CommandHandler::DecrementHoldOff()
return;
}

if (mpExchangeCtx->IsGroupExchangeContext())
{
mpExchangeCtx->Close();
}
else
if (!mExchangeCtx->IsGroupExchangeContext() && !mSentStatusResponse)
{
CHIP_ERROR err = SendCommandResponse();
if (err != CHIP_NO_ERROR)
{
ChipLogError(DataManagement, "Failed to send command response: %" CHIP_ERROR_FORMAT, err.Format());
// We marked the exchange as "WillSendMessage", need to shutdown the exchange manually to avoid leaking exchanges.
if (mpExchangeCtx != nullptr)
{
mpExchangeCtx->Close();
}
}
}

Expand All @@ -232,11 +204,11 @@ CHIP_ERROR CommandHandler::SendCommandResponse()

VerifyOrReturnError(mPendingWork == 0, CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(mState == State::AddedCommand, CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(mpExchangeCtx != nullptr, CHIP_ERROR_INCORRECT_STATE);
VerifyOrReturnError(mExchangeCtx, CHIP_ERROR_INCORRECT_STATE);

ReturnErrorOnFailure(Finalize(commandPacket));
ReturnErrorOnFailure(
mpExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::InvokeCommandResponse, std::move(commandPacket)));
mExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::InvokeCommandResponse, std::move(commandPacket)));
// The ExchangeContext is automatically freed here, and it makes mpExchangeCtx be temporarily dangling, but in
// all cases, we are going to call Close immediately after this function, which nulls out mpExchangeCtx.

Expand Down Expand Up @@ -290,7 +262,7 @@ CHIP_ERROR CommandHandler::ProcessCommandDataIB(CommandDataIB::Parser & aCommand
}
}

VerifyOrExit(mpExchangeCtx != nullptr && mpExchangeCtx->HasSessionHandle(), err = CHIP_ERROR_INCORRECT_STATE);
VerifyOrExit(mExchangeCtx && mExchangeCtx->HasSessionHandle(), err = CHIP_ERROR_INCORRECT_STATE);

{
Access::SubjectDescriptor subjectDescriptor = GetSubjectDescriptor();
Expand Down Expand Up @@ -379,7 +351,7 @@ CHIP_ERROR CommandHandler::ProcessGroupCommandDataIB(CommandDataIB::Parser & aCo
err = commandPath.GetCommandId(&commandId);
SuccessOrExit(err);

groupId = mpExchangeCtx->GetSessionHandle()->AsIncomingGroupSession()->GetGroupId();
groupId = mExchangeCtx->GetSessionHandle()->AsIncomingGroupSession()->GetGroupId();
fabric = GetAccessingFabricIndex();

ChipLogDetail(DataManagement, "Received group command for Group=%u Cluster=" ChipLogFormatMEI " Command=" ChipLogFormatMEI,
Expand Down Expand Up @@ -596,7 +568,7 @@ TLV::TLVWriter * CommandHandler::GetCommandDataIBTLVWriter()

FabricIndex CommandHandler::GetAccessingFabricIndex() const
{
return mpExchangeCtx->GetSessionHandle()->GetFabricIndex();
return mExchangeCtx->GetSessionHandle()->GetFabricIndex();
}

CommandHandler * CommandHandler::Handle::Get()
Expand Down Expand Up @@ -666,27 +638,6 @@ void CommandHandler::MoveToState(const State aTargetState)
ChipLogDetail(DataManagement, "ICR moving to [%10.10s]", GetStateStr());
}

void CommandHandler::Abort()
{
//
// If the exchange context hasn't already been gracefully closed
// (signaled by setting it to null), then we need to forcibly
// tear it down.
//
if (mpExchangeCtx != nullptr)
{
// We might be a delegate for this exchange, and we don't want the
// OnExchangeClosing notification in that case. Null out the delegate
// to avoid that.
//
// TODO: This makes all sorts of assumptions about what the delegate is
// (notice the "might" above!) that might not hold in practice. We
// really need a better solution here....
mpExchangeCtx->SetDelegate(nullptr);
mpExchangeCtx->Abort();
mpExchangeCtx = nullptr;
}
}
} // namespace app
} // namespace chip

Expand Down
46 changes: 31 additions & 15 deletions src/app/CommandHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
#include <lib/support/CodeUtils.h>
#include <lib/support/DLLUtil.h>
#include <lib/support/logging/CHIPLogging.h>
#include <messaging/ExchangeContext.h>
#include <messaging/ExchangeHolder.h>
#include <messaging/Flags.h>
#include <protocols/Protocols.h>
#include <protocols/interaction_model/Constants.h>
Expand All @@ -46,16 +46,9 @@
namespace chip {
namespace app {

class CommandHandler
class CommandHandler : public Messaging::ExchangeDelegate
{
public:
/*
* Destructor - as part of destruction, it will abort the exchange context
* if a valid one still exists.
*
* See Abort() for details on when that might occur.
*/
virtual ~CommandHandler() { Abort(); }
class Callback
{
public:
Expand Down Expand Up @@ -221,11 +214,15 @@ class CommandHandler
/**
* Gets the inner exchange context object, without ownership.
*
* WARNING: This is dangerous, since it is directly interacting with the
* exchange being managed automatically by mExchangeCtx and
* if not done carefully, may end up with use-after-free errors.
*
* @return The inner exchange context, might be nullptr if no
* exchange context has been assigned or the context
* has been released.
*/
Messaging::ExchangeContext * GetExchangeContext() const { return mpExchangeCtx; }
Messaging::ExchangeContext * GetExchangeContext() const { return mExchangeCtx.Get(); }

/**
* @brief Flush acks right away for a slow command
Expand All @@ -240,18 +237,35 @@ class CommandHandler
*/
void FlushAcksRightAwayOnSlowCommand()
{
VerifyOrReturn(mpExchangeCtx != nullptr);
auto * msgContext = mpExchangeCtx->GetReliableMessageContext();
VerifyOrReturn(mExchangeCtx);
auto * msgContext = mExchangeCtx->GetReliableMessageContext();
VerifyOrReturn(msgContext != nullptr);
msgContext->FlushAcks();
}

Access::SubjectDescriptor GetSubjectDescriptor() const { return mpExchangeCtx->GetSessionHandle()->GetSubjectDescriptor(); }
Access::SubjectDescriptor GetSubjectDescriptor() const { return mExchangeCtx->GetSessionHandle()->GetSubjectDescriptor(); }

private:
friend class TestCommandInteraction;
friend class CommandHandler::Handle;

CHIP_ERROR OnMessageReceived(Messaging::ExchangeContext * ec, const PayloadHeader & payloadHeader,
System::PacketBufferHandle && payload) override
{
//
// We shouldn't be receiving any further messages on this exchange.
//
return CHIP_ERROR_INCORRECT_STATE;
}

void OnResponseTimeout(Messaging::ExchangeContext * ec) override
{
//
// We're not expecting responses to any messages we send out on this EC.
//
VerifyOrDie(false);
}

enum class State
{
Idle, ///< Default state that the object starts out in, where no work has commenced
Expand Down Expand Up @@ -343,14 +357,16 @@ class CommandHandler
return FinishCommand(/* aEndDataStruct = */ false);
}

Messaging::ExchangeContext * mpExchangeCtx = nullptr;
Callback * mpCallback = nullptr;
Messaging::ExchangeHolder mExchangeCtx;
Callback * mpCallback = nullptr;
InvokeResponseMessage::Builder mInvokeResponseBuilder;
TLV::TLVType mDataElementContainerType = TLV::kTLVType_NotSpecified;
size_t mPendingWork = 0;
bool mSuppressResponse = false;
bool mTimedRequest = false;

bool mSentStatusResponse = false;

State mState = State::Idle;
chip::System::PacketBufferTLVWriter mCommandMessageWriter;
TLV::TLVWriter mBackupWriter;
Expand Down
66 changes: 17 additions & 49 deletions src/app/CommandSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ namespace chip {
namespace app {

CommandSender::CommandSender(Callback * apCallback, Messaging::ExchangeManager * apExchangeMgr, bool aIsTimedRequest) :
mpCallback(apCallback), mpExchangeMgr(apExchangeMgr), mSuppressResponse(false), mTimedRequest(aIsTimedRequest)
mExchangeCtx(*this), mpCallback(apCallback), mpExchangeMgr(apExchangeMgr), mSuppressResponse(false),
mTimedRequest(aIsTimedRequest)
{}

CHIP_ERROR CommandSender::AllocateBuffer()
Expand Down Expand Up @@ -67,15 +68,17 @@ CHIP_ERROR CommandSender::SendCommandRequest(const SessionHandle & session, Opti
ReturnErrorOnFailure(Finalize(mPendingInvokeData));

// Create a new exchange context.
mpExchangeCtx = mpExchangeMgr->NewContext(session, this);
VerifyOrReturnError(mpExchangeCtx != nullptr, CHIP_ERROR_NO_MEMORY);
VerifyOrReturnError(!mpExchangeCtx->IsGroupExchangeContext(), CHIP_ERROR_INVALID_MESSAGE_TYPE);
auto exchange = mpExchangeMgr->NewContext(session, this);
VerifyOrReturnError(exchange != nullptr, CHIP_ERROR_NO_MEMORY);

mpExchangeCtx->SetResponseTimeout(timeout.ValueOr(session->ComputeRoundTripTimeout(app::kExpectedIMProcessingTime)));
mExchangeCtx.Grab(exchange);
VerifyOrReturnError(!mExchangeCtx->IsGroupExchangeContext(), CHIP_ERROR_INVALID_MESSAGE_TYPE);

mExchangeCtx->SetResponseTimeout(timeout.ValueOr(session->ComputeRoundTripTimeout(app::kExpectedIMProcessingTime)));

if (mTimedInvokeTimeoutMs.HasValue())
{
ReturnErrorOnFailure(TimedRequest::Send(mpExchangeCtx, mTimedInvokeTimeoutMs.Value()));
ReturnErrorOnFailure(TimedRequest::Send(mExchangeCtx.Get(), mTimedInvokeTimeoutMs.Value()));
MoveToState(State::AwaitingTimedStatus);
return CHIP_NO_ERROR;
}
Expand All @@ -90,14 +93,13 @@ CHIP_ERROR CommandSender::SendGroupCommandRequest(const SessionHandle & session)
ReturnErrorOnFailure(Finalize(mPendingInvokeData));

// Create a new exchange context.
mpExchangeCtx = mpExchangeMgr->NewContext(session, this);
VerifyOrReturnError(mpExchangeCtx != nullptr, CHIP_ERROR_NO_MEMORY);
VerifyOrReturnError(mpExchangeCtx->IsGroupExchangeContext(), CHIP_ERROR_INVALID_MESSAGE_TYPE);
auto exchange = mpExchangeMgr->NewContext(session, this);
VerifyOrReturnError(exchange != nullptr, CHIP_ERROR_NO_MEMORY);

ReturnErrorOnFailure(SendInvokeRequest());
mExchangeCtx.Grab(exchange);
VerifyOrReturnError(mExchangeCtx->IsGroupExchangeContext(), CHIP_ERROR_INVALID_MESSAGE_TYPE);

// Exchange is gone now, since it closed itself on successful send.
mpExchangeCtx = nullptr;
ReturnErrorOnFailure(SendInvokeRequest());

Close();
return CHIP_NO_ERROR;
Expand All @@ -108,8 +110,8 @@ CHIP_ERROR CommandSender::SendInvokeRequest()
using namespace Protocols::InteractionModel;
using namespace Messaging;

ReturnErrorOnFailure(mpExchangeCtx->SendMessage(MsgType::InvokeCommandRequest, std::move(mPendingInvokeData),
SendMessageFlags::kExpectResponse));
ReturnErrorOnFailure(
mExchangeCtx->SendMessage(MsgType::InvokeCommandRequest, std::move(mPendingInvokeData), SendMessageFlags::kExpectResponse));
MoveToState(State::CommandSent);

return CHIP_NO_ERROR;
Expand All @@ -124,7 +126,7 @@ CHIP_ERROR CommandSender::OnMessageReceived(Messaging::ExchangeContext * apExcha
}

CHIP_ERROR err = CHIP_NO_ERROR;
VerifyOrExit(apExchangeContext == mpExchangeCtx, err = CHIP_ERROR_INCORRECT_STATE);
VerifyOrExit(apExchangeContext == mExchangeCtx.Get(), err = CHIP_ERROR_INCORRECT_STATE);

if (mState == State::AwaitingTimedStatus)
{
Expand Down Expand Up @@ -223,18 +225,6 @@ void CommandSender::Close()
mTimedRequest = false;
MoveToState(State::AwaitingDestruction);

// OnDone below can destroy us before we unwind all the way back into the
// exchange code and it tries to close itself. Make sure that it doesn't
// try to notify us that it's closing, since we will be dead.
//
// For more details, see #10344.
if (mpExchangeCtx != nullptr)
{
mpExchangeCtx->SetDelegate(nullptr);
}

mpExchangeCtx = nullptr;

if (mpCallback)
{
mpCallback->OnDone(this);
Expand Down Expand Up @@ -443,27 +433,5 @@ void CommandSender::MoveToState(const State aTargetState)
ChipLogDetail(DataManagement, "ICR moving to [%10.10s]", GetStateStr());
}

void CommandSender::Abort()
{
//
// If the exchange context hasn't already been gracefully closed
// (signaled by setting it to null), then we need to forcibly
// tear it down.
//
if (mpExchangeCtx != nullptr)
{
// We might be a delegate for this exchange, and we don't want the
// OnExchangeClosing notification in that case. Null out the delegate
// to avoid that.
//
// TODO: This makes all sorts of assumptions about what the delegate is
// (notice the "might" above!) that might not hold in practice. We
// really need a better solution here....
mpExchangeCtx->SetDelegate(nullptr);
mpExchangeCtx->Abort();
mpExchangeCtx = nullptr;
}
}

} // namespace app
} // namespace chip
Loading

0 comments on commit 1201632

Please sign in to comment.