Skip to content

Commit

Permalink
Make RPC call synchronous in Fabric Sync (#34187)
Browse files Browse the repository at this point in the history
* Make RPC call synchronous in Fabric Sync

* Address review comments
  • Loading branch information
yufengwangca authored Jul 12, 2024
1 parent 4c6a612 commit a5d8f62
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 8 deletions.
42 changes: 39 additions & 3 deletions examples/fabric-admin/rpc/RpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
#include "RpcClient.h"
#include "RpcClientProcessor.h"

#include <chrono>
#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>
#include <unistd.h>

#include "fabric_bridge_service/fabric_bridge_service.rpc.pb.h"
#include "pw_assert/check.h"
Expand All @@ -36,16 +38,45 @@ using namespace chip;
namespace {

// Constants
constexpr uint32_t kRpcTimeoutMs = 1000;
constexpr uint32_t kDefaultChannelId = 1;

// Fabric Bridge Client
rpc::pw_rpc::nanopb::FabricBridge::Client fabricBridgeClient(rpc::client::GetDefaultRpcClient(), kDefaultChannelId);
pw::rpc::NanopbUnaryReceiver<::pw_protobuf_Empty> addSynchronizedDeviceCall;
pw::rpc::NanopbUnaryReceiver<::pw_protobuf_Empty> removeSynchronizedDeviceCall;

std::mutex responseMutex;
std::condition_variable responseCv;
bool responseReceived = false;
CHIP_ERROR responseError = CHIP_NO_ERROR;

template <typename CallType>
CHIP_ERROR WaitForResponse(CallType & call)
{
std::unique_lock<std::mutex> lock(responseMutex);
responseReceived = false;
responseError = CHIP_NO_ERROR;

if (responseCv.wait_for(lock, std::chrono::milliseconds(kRpcTimeoutMs), [] { return responseReceived; }))
{
return responseError;
}
else
{
fprintf(stderr, "RPC Response timed out!");
return CHIP_ERROR_TIMEOUT;
}
}

// Callback function to be called when the RPC response is received
void OnAddDeviceResponseCompleted(const pw_protobuf_Empty & response, pw::Status status)
{
std::lock_guard<std::mutex> lock(responseMutex);
responseReceived = true;
responseError = status.ok() ? CHIP_NO_ERROR : CHIP_ERROR_INTERNAL;
responseCv.notify_one();

if (status.ok())
{
ChipLogProgress(NotSpecified, "AddSynchronizedDevice RPC call succeeded!");
Expand All @@ -59,6 +90,11 @@ void OnAddDeviceResponseCompleted(const pw_protobuf_Empty & response, pw::Status
// Callback function to be called when the RPC response is received
void OnRemoveDeviceResponseCompleted(const pw_protobuf_Empty & response, pw::Status status)
{
std::lock_guard<std::mutex> lock(responseMutex);
responseReceived = true;
responseError = status.ok() ? CHIP_NO_ERROR : CHIP_ERROR_INTERNAL;
responseCv.notify_one();

if (status.ok())
{
ChipLogProgress(NotSpecified, "RemoveSynchronizedDevice RPC call succeeded!");
Expand Down Expand Up @@ -101,7 +137,7 @@ CHIP_ERROR AddSynchronizedDevice(chip::NodeId nodeId)
return CHIP_ERROR_INTERNAL;
}

return CHIP_NO_ERROR;
return WaitForResponse(addSynchronizedDeviceCall);
}

CHIP_ERROR RemoveSynchronizedDevice(chip::NodeId nodeId)
Expand All @@ -128,5 +164,5 @@ CHIP_ERROR RemoveSynchronizedDevice(chip::NodeId nodeId)
return CHIP_ERROR_INTERNAL;
}

return CHIP_NO_ERROR;
return WaitForResponse(removeSynchronizedDeviceCall);
}
4 changes: 2 additions & 2 deletions examples/fabric-admin/rpc/RpcClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ CHIP_ERROR InitRpcClient(uint16_t rpcServerPort);
*
* @param nodeId The Node ID of the device to be added.
* @return CHIP_ERROR An error code indicating the success or failure of the operation.
* - CHIP_NO_ERROR: The RPC command was successfully sent.
* - CHIP_NO_ERROR: The RPC command was successfully processed.
* - CHIP_ERROR_BUSY: Another operation is currently in progress.
* - CHIP_ERROR_INTERNAL: An internal error occurred while activating the RPC call.
*/
Expand All @@ -56,7 +56,7 @@ CHIP_ERROR AddSynchronizedDevice(chip::NodeId nodeId);
*
* @param nodeId The Node ID of the device to be removed.
* @return CHIP_ERROR An error code indicating the success or failure of the operation.
* - CHIP_NO_ERROR: The RPC command was successfully sent.
* - CHIP_NO_ERROR: The RPC command was successfully processed.
* - CHIP_ERROR_BUSY: Another operation is currently in progress.
* - CHIP_ERROR_INTERNAL: An internal error occurred while activating the RPC call.
*/
Expand Down
36 changes: 34 additions & 2 deletions examples/fabric-bridge-app/linux/RpcClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
#include "RpcClient.h"
#include "RpcClientProcessor.h"

#include <chrono>
#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>
#include <unistd.h>

#include "fabric_admin_service/fabric_admin_service.rpc.pb.h"
#include "pw_assert/check.h"
Expand All @@ -36,15 +38,44 @@ using namespace chip;
namespace {

// Constants
constexpr uint32_t kRpcTimeoutMs = 1000;
constexpr uint32_t kDefaultChannelId = 1;

// Fabric Admin Client
rpc::pw_rpc::nanopb::FabricAdmin::Client fabricAdminClient(rpc::client::GetDefaultRpcClient(), kDefaultChannelId);
pw::rpc::NanopbUnaryReceiver<::chip_rpc_OperationStatus> openCommissioningWindowCall;

std::mutex responseMutex;
std::condition_variable responseCv;
bool responseReceived = false;
CHIP_ERROR responseError = CHIP_NO_ERROR;

template <typename CallType>
CHIP_ERROR WaitForResponse(CallType & call)
{
std::unique_lock<std::mutex> lock(responseMutex);
responseReceived = false;
responseError = CHIP_NO_ERROR;

if (responseCv.wait_for(lock, std::chrono::milliseconds(kRpcTimeoutMs), [] { return responseReceived; }))
{
return responseError;
}
else
{
ChipLogError(NotSpecified, "RPC Response timed out!");
return CHIP_ERROR_TIMEOUT;
}
}

// Callback function to be called when the RPC response is received
void OnOpenCommissioningWindowCompleted(const chip_rpc_OperationStatus & response, pw::Status status)
{
std::lock_guard<std::mutex> lock(responseMutex);
responseReceived = true;
responseError = status.ok() ? CHIP_NO_ERROR : CHIP_ERROR_INTERNAL;
responseCv.notify_one();

if (status.ok())
{
ChipLogProgress(NotSpecified, "OpenCommissioningWindow received operation status: %d", response.success);
Expand Down Expand Up @@ -81,8 +112,9 @@ CHIP_ERROR OpenCommissioningWindow(NodeId nodeId)

if (!openCommissioningWindowCall.active())
{
// The RPC call was not sent. This could occur due to, for example, an invalid channel ID. Handle if necessary.
return CHIP_ERROR_INTERNAL;
}

return CHIP_NO_ERROR;
return WaitForResponse(openCommissioningWindowCall);
}
2 changes: 1 addition & 1 deletion examples/fabric-bridge-app/linux/include/RpcClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ CHIP_ERROR InitRpcClient(uint16_t rpcServerPort);
*
* @param nodeId The identifier of the node for which the commissioning window should be opened.
* @return CHIP_ERROR An error code indicating the success or failure of the operation.
* - CHIP_NO_ERROR: The RPC command was successfully sent.
* - CHIP_NO_ERROR: The RPC command was successfully processed.
* - CHIP_ERROR_BUSY: Another commissioning window is currently in progress.
* - CHIP_ERROR_INTERNAL: An internal error occurred.
*/
Expand Down

0 comments on commit a5d8f62

Please sign in to comment.