Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix management of the mNumReportsInFlight count in reporting engine. (#24093) #24114

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions src/app/ReadHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ CHIP_ERROR ReadHandler::SendStatusReport(Protocols::InteractionModel::Status aSt
CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, bool aMoreChunks)
{
VerifyOrReturnLogError(IsReportable(), CHIP_ERROR_INCORRECT_STATE);
VerifyOrDie(!IsAwaitingReportResponse()); // Should not be reportable!
if (IsPriming() || IsChunkedReport())
{
mSessionHandle.Grab(mExchangeCtx->GetSessionHandle());
Expand All @@ -217,27 +218,31 @@ CHIP_ERROR ReadHandler::SendReportData(System::PacketBufferHandle && aPayload, b
mCurrentReportsBeginGeneration = InteractionModelEngine::GetInstance()->GetReportingEngine().GetDirtySetGeneration();
}
SetStateFlag(ReadHandlerFlags::ChunkedReport, aMoreChunks);
bool noResponseExpected = IsType(InteractionType::Read) && !aMoreChunks;
if (!noResponseExpected)
{
MoveToState(HandlerState::AwaitingReportResponse);
}
bool responseExpected = IsType(InteractionType::Subscribe) || aMoreChunks;

mExchangeCtx->UseSuggestedResponseTimeout(app::kExpectedIMProcessingTime);
CHIP_ERROR err =
mExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::ReportData, std::move(aPayload),
Messaging::SendFlags(noResponseExpected ? Messaging::SendMessageFlags::kNone
: Messaging::SendMessageFlags::kExpectResponse));
if (err == CHIP_NO_ERROR && noResponseExpected)
{
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
}

CHIP_ERROR err = mExchangeCtx->SendMessage(Protocols::InteractionModel::MsgType::ReportData, std::move(aPayload),
responseExpected ? Messaging::SendMessageFlags::kExpectResponse
: Messaging::SendMessageFlags::kNone);
if (err == CHIP_NO_ERROR)
{
if (responseExpected)
{
MoveToState(HandlerState::AwaitingReportResponse);
}
else
{
// Make sure we're not treated as an in-flight report waiting for a
// response by the reporting engine.
InteractionModelEngine::GetInstance()->GetReportingEngine().OnReportConfirm();
}

if (IsType(InteractionType::Subscribe) && !IsPriming())
{
err = RefreshSubscribeSyncTimer();
// Ignore the error from RefreshSubscribeSyncTimer. If we've
// successfully sent the message, we need to return success from
// this method.
RefreshSubscribeSyncTimer();
}
}
if (!aMoreChunks)
Expand Down
2 changes: 2 additions & 0 deletions src/app/ReadHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ class ReadHandler : public Messaging::ExchangeDelegate
* @retval #Others If fails to send report data
* @retval #CHIP_NO_ERROR On success.
*
* If an error is returned, the ReadHandler guarantees that it is not in
* a state where it's waiting for a response.
*/
CHIP_ERROR SendReportData(System::PacketBufferHandle && aPayload, bool aMoreChunks);

Expand Down
4 changes: 4 additions & 0 deletions src/app/reporting/Engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,10 @@ CHIP_ERROR Engine::SendReport(ReadHandler * apReadHandler, System::PacketBufferH
// We can only have 1 report in flight for any given read - increment and break out.
mNumReportsInFlight++;
err = apReadHandler->SendReportData(std::move(aPayload), aHasMoreChunks);
if (err != CHIP_NO_ERROR)
{
--mNumReportsInFlight;
}
return err;
}

Expand Down
142 changes: 142 additions & 0 deletions src/app/tests/TestReadInteraction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ class TestReadInteraction
static void TestSubscribeRoundtripStatusReportTimeout(nlTestSuite * apSuite, void * apContext);
static void TestPostSubscribeRoundtripStatusReportTimeout(nlTestSuite * apSuite, void * apContext);
static void TestReadChunkingStatusReportTimeout(nlTestSuite * apSuite, void * apContext);
static void TestReadReportFailure(nlTestSuite * apSuite, void * apContext);
static void TestSubscribeRoundtripChunkStatusReportTimeout(nlTestSuite * apSuite, void * apContext);
static void TestPostSubscribeRoundtripChunkStatusReportTimeout(nlTestSuite * apSuite, void * apContext);
static void TestPostSubscribeRoundtripChunkReportTimeout(nlTestSuite * apSuite, void * apContext);
Expand All @@ -338,6 +339,7 @@ class TestReadInteraction
static void TestReadHandlerInvalidSubscribeRequest(nlTestSuite * apSuite, void * apContext);
static void TestSubscribeInvalidateFabric(nlTestSuite * apSuite, void * apContext);
static void TestShutdownSubscription(nlTestSuite * apSuite, void * apContext);
static void TestSubscriptionReportWithDefunctSession(nlTestSuite * apSuite, void * apContext);
static void TestReadHandlerMalformedSubscribeRequest(nlTestSuite * apSuite, void * apContext);

private:
Expand Down Expand Up @@ -2536,6 +2538,57 @@ void TestReadInteraction::TestReadChunkingStatusReportTimeout(nlTestSuite * apSu
ctx.CreateSessionBobToAlice();
}

// ReadClient sends the read request, but handler fails to send the one report (SendMessage returns an error).
// Since this is an un-chunked read, we are not in the AwaitingReportResponse state, so the "reports in flight"
// counter should not increase.
void TestReadInteraction::TestReadReportFailure(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
CHIP_ERROR err = CHIP_NO_ERROR;

Messaging::ReliableMessageMgr * rm = ctx.GetExchangeManager().GetReliableMessageMgr();
// Shouldn't have anything in the retransmit table when starting the test.
NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0);

MockInteractionModelApp delegate;
auto * engine = chip::app::InteractionModelEngine::GetInstance();
err = engine->Init(&ctx.GetExchangeManager(), &ctx.GetFabricTable());
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);
NL_TEST_ASSERT(apSuite, !delegate.mGotEventResponse);

chip::app::AttributePathParams attributePathParams[1];
attributePathParams[0].mEndpointId = Test::kMockEndpoint2;
attributePathParams[0].mClusterId = Test::MockClusterId(3);
attributePathParams[0].mAttributeId = Test::MockAttributeId(1);

ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice());
readPrepareParams.mpEventPathParamsList = nullptr;
readPrepareParams.mEventPathParamsListSize = 0;
readPrepareParams.mpAttributePathParamsList = attributePathParams;
readPrepareParams.mAttributePathParamsListSize = 1;

{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Read);

ctx.GetLoopback().mNumMessagesToAllowBeforeError = 1;
ctx.GetLoopback().mMessageSendError = CHIP_ERROR_INCORRECT_STATE;
err = readClient.SendRequest(readPrepareParams);
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ctx.DrainAndServiceIO();
NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers() == 0);

ctx.GetLoopback().mNumMessagesToAllowBeforeError = 0;
ctx.GetLoopback().mMessageSendError = CHIP_NO_ERROR;
}

NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadClients() == 0);
engine->Shutdown();
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}

void TestReadInteraction::TestSubscribeRoundtripChunkStatusReportTimeout(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
Expand Down Expand Up @@ -4101,6 +4154,93 @@ void TestReadInteraction::TestShutdownSubscription(nlTestSuite * apSuite, void *
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);
}

/**
* Tests what happens when a subscription tries to deliver reports but the
* session it has is defunct. Makes sure we correctly tear down the ReadHandler
* and don't increment the "reports in flight" count.
*/
void TestReadInteraction::TestSubscriptionReportWithDefunctSession(nlTestSuite * apSuite, void * apContext)
{
TestContext & ctx = *static_cast<TestContext *>(apContext);
CHIP_ERROR err = CHIP_NO_ERROR;

Messaging::ReliableMessageMgr * rm = ctx.GetExchangeManager().GetReliableMessageMgr();
// Shouldn't have anything in the retransmit table when starting the test.
NL_TEST_ASSERT(apSuite, rm->TestGetCountRetransTable() == 0);

MockInteractionModelApp delegate;
auto * engine = chip::app::InteractionModelEngine::GetInstance();
err = engine->Init(&ctx.GetExchangeManager(), &ctx.GetFabricTable());
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ReadPrepareParams readPrepareParams(ctx.GetSessionBobToAlice());
readPrepareParams.mpAttributePathParamsList = new chip::app::AttributePathParams[1];
readPrepareParams.mAttributePathParamsListSize = 1;

AttributePathParams subscribePath(Test::kMockEndpoint3, Test::MockClusterId(2), Test::MockAttributeId(4));
readPrepareParams.mpAttributePathParamsList[0] = subscribePath;

readPrepareParams.mMinIntervalFloorSeconds = 0;
readPrepareParams.mMaxIntervalCeilingSeconds = 0;

{
app::ReadClient readClient(chip::app::InteractionModelEngine::GetInstance(), &ctx.GetExchangeManager(), delegate,
chip::app::ReadClient::InteractionType::Subscribe);

delegate.mGotReport = false;

err = readClient.SendSubscribeRequest(std::move(readPrepareParams));
NL_TEST_ASSERT(apSuite, err == CHIP_NO_ERROR);

ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, delegate.mGotReport);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Read) == 0);
NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0);

NL_TEST_ASSERT(apSuite, engine->ActiveHandlerAt(0) != nullptr);
auto * readHandler = engine->ActiveHandlerAt(0);

// Verify that the session we will reset later is the one we will mess
// with now.
NL_TEST_ASSERT(apSuite, SessionHandle(*readHandler->GetSession()) == ctx.GetSessionAliceToBob());

// Test that we send reports as needed.
readHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false);
delegate.mGotReport = false;
engine->GetReportingEngine().SetDirty(subscribePath);

ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, delegate.mGotReport);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 1);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Read) == 0);
NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0);

// Test that if the session is defunct we don't send reports and clean
// up properly.
readHandler->GetSession()->MarkAsDefunct();
readHandler->mFlags.Set(ReadHandler::ReadHandlerFlags::HoldReport, false);
delegate.mGotReport = false;
engine->GetReportingEngine().SetDirty(subscribePath);

ctx.DrainAndServiceIO();

NL_TEST_ASSERT(apSuite, !delegate.mGotReport);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Subscribe) == 0);
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadHandlers(ReadHandler::InteractionType::Read) == 0);
NL_TEST_ASSERT(apSuite, engine->GetReportingEngine().GetNumReportsInFlight() == 0);
}
engine->Shutdown();
NL_TEST_ASSERT(apSuite, engine->GetNumActiveReadClients() == 0);
NL_TEST_ASSERT(apSuite, ctx.GetExchangeManager().GetNumActiveExchanges() == 0);

// Get rid of our defunct session.
ctx.ExpireSessionAliceToBob();
ctx.CreateSessionAliceToBob();
}

} // namespace app
} // namespace chip

Expand Down Expand Up @@ -4160,10 +4300,12 @@ const nlTest sTests[] =
NL_TEST_DEF("TestSubscribeRoundtripStatusReportTimeout", chip::app::TestReadInteraction::TestSubscribeRoundtripStatusReportTimeout),
NL_TEST_DEF("TestPostSubscribeRoundtripStatusReportTimeout", chip::app::TestReadInteraction::TestPostSubscribeRoundtripStatusReportTimeout),
NL_TEST_DEF("TestReadChunkingStatusReportTimeout", chip::app::TestReadInteraction::TestReadChunkingStatusReportTimeout),
NL_TEST_DEF("TestReadReportFailure", chip::app::TestReadInteraction::TestReadReportFailure),
NL_TEST_DEF("TestSubscribeRoundtripChunkStatusReportTimeout", chip::app::TestReadInteraction::TestSubscribeRoundtripChunkStatusReportTimeout),
NL_TEST_DEF("TestPostSubscribeRoundtripChunkStatusReportTimeout", chip::app::TestReadInteraction::TestPostSubscribeRoundtripChunkStatusReportTimeout),
NL_TEST_DEF("TestPostSubscribeRoundtripChunkReportTimeout", chip::app::TestReadInteraction::TestPostSubscribeRoundtripChunkReportTimeout),
NL_TEST_DEF("TestReadShutdown", chip::app::TestReadInteraction::TestReadShutdown),
NL_TEST_DEF("TestSubscriptionReportWithDefunctSession", chip::app::TestReadInteraction::TestSubscriptionReportWithDefunctSession),
NL_TEST_SENTINEL()
};
// clang-format on
Expand Down
11 changes: 10 additions & 1 deletion src/transport/raw/tests/NetworkTestHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,16 @@ class LoopbackTransport : public Transport::Base

CHIP_ERROR SendMessage(const Transport::PeerAddress & address, System::PacketBufferHandle && msgBuf) override
{
ReturnErrorOnFailure(mMessageSendError);
if (mNumMessagesToAllowBeforeError == 0)
{
ReturnErrorOnFailure(mMessageSendError);
}
mSentMessageCount++;
bool dropMessage = false;
if (mNumMessagesToAllowBeforeError > 0)
{
--mNumMessagesToAllowBeforeError;
}
if (mNumMessagesToAllowBeforeDropping > 0)
{
--mNumMessagesToAllowBeforeDropping;
Expand Down Expand Up @@ -143,6 +150,7 @@ class LoopbackTransport : public Transport::Base
mDroppedMessageCount = 0;
mSentMessageCount = 0;
mNumMessagesToAllowBeforeDropping = 0;
mNumMessagesToAllowBeforeError = 0;
mMessageSendError = CHIP_NO_ERROR;
}

Expand All @@ -163,6 +171,7 @@ class LoopbackTransport : public Transport::Base
uint32_t mDroppedMessageCount = 0;
uint32_t mSentMessageCount = 0;
uint32_t mNumMessagesToAllowBeforeDropping = 0;
uint32_t mNumMessagesToAllowBeforeError = 0;
CHIP_ERROR mMessageSendError = CHIP_NO_ERROR;
LoopbackTransportDelegate * mDelegate = nullptr;
};
Expand Down