Skip to content

Commit

Permalink
[core] protobuf service client to v6 (#1981)
Browse files Browse the repository at this point in the history
  • Loading branch information
rex-schilasky authored Feb 13, 2025
1 parent a056c5a commit db3f4f1
Show file tree
Hide file tree
Showing 39 changed files with 2,397 additions and 642 deletions.
22 changes: 13 additions & 9 deletions app/rec/rec_server_cli/src/commands/command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,23 +73,27 @@ namespace eCAL
}
}

eCAL::rec::Error Command::CallRemoteEcalrecService(const std::shared_ptr<eCAL::protobuf::CServiceClient<eCAL::pb::rec_server::EcalRecServerService>>& remote_ecalsys_service
eCAL::rec::Error Command::CallRemoteEcalrecService(const std::shared_ptr<eCAL::protobuf::CServiceClient<eCAL::pb::rec_server::EcalRecServerService>>& remote_ecalrec_service
, const std::string& hostname
, const std::string& method_name
, const google::protobuf::Message& request
, google::protobuf::Message& response)
{
remote_ecalsys_service->SetHostName(hostname);
constexpr int timeout_ms(1000);

eCAL::v5::ServiceResponseVecT service_response_vec;
constexpr int timeout_ms = 1000;

if (remote_ecalsys_service->Call(method_name, request.SerializeAsString(), timeout_ms, &service_response_vec))
auto client_instances = remote_ecalrec_service->GetClientInstances();
for (auto& client_instance : client_instances)
{
if (service_response_vec.size() > 0)
// TODO: We need to filter for pid as well in the future?
// Currently empty hostname means "all hosts"
if (client_instance.GetClientID().host_name == hostname || hostname.empty())
{
response.ParseFromString(service_response_vec[0].response);
return eCAL::rec::Error::ErrorCode::OK;
auto client_instance_response = client_instance.CallWithResponse(method_name, request, timeout_ms);
if (client_instance_response.first)
{
response.ParseFromString(client_instance_response.second.response);
return eCAL::rec::Error::ErrorCode::OK;
}
}
}
return eCAL::rec::Error(eCAL::rec::Error::ErrorCode::REMOTE_HOST_UNAVAILABLE, hostname);
Expand Down
2 changes: 1 addition & 1 deletion app/rec/rec_server_cli/src/ecal_rec_server_cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,7 @@ int main(int argc, char** argv)
else
{
std::cout << "Shutting down remote-control for eCAL Rec..." << std::endl;
remote_rec_server_service->Destroy();
remote_rec_server_service = nullptr;
}

// TODO: On rare occations, eCAL rec hangs when hitting ctrl+c on linux. Remove the debug output, once that bug is found.
Expand Down
23 changes: 13 additions & 10 deletions app/rec/rec_server_core/src/recorder/remote_recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ namespace eCAL
, last_response_ ({true, ""})
, should_be_connected_to_ecal_ (false)
, complete_settings_ (initial_settings)
, hostname_ (hostname)
{
// Bind the recorder_service_ to the hostname
recorder_service_.SetHostName(hostname);

// Initial Ping => to perform auto recovery, which also sets the initial settings
actions_to_perform_.emplace_back(Action());

Expand Down Expand Up @@ -667,16 +665,21 @@ namespace eCAL

bool RemoteRecorder::CallRecorderService(const std::string& method_name, const google::protobuf::Message& request, google::protobuf::Message& response)
{
// The target (i.e. the hostname) has already been set in the Constructor.
constexpr int timeout_ms(1000);

eCAL::v5::ServiceResponseVecT service_response_vec;
constexpr int timeout_ms = 1000;
if (recorder_service_.Call(method_name, request.SerializeAsString(), timeout_ms, &service_response_vec))
auto client_instances = recorder_service_.GetClientInstances();
for (auto& client_instance : client_instances)
{
if (service_response_vec.size() > 0)
// TODO: We need to filter for pid as well in the future?
// Currently empty hostname means "all hosts"
if (client_instance.GetClientID().host_name == hostname_ || hostname_.empty())
{
response.ParseFromString(service_response_vec[0].response);
return true;
auto client_instance_response = client_instance.CallWithResponse(method_name, request, timeout_ms);
if (client_instance_response.first)
{
response.ParseFromString(client_instance_response.second.response);
return true;
}
}
}
return false;
Expand Down
2 changes: 2 additions & 0 deletions app/rec/rec_server_core/src/recorder/remote_recorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ namespace eCAL
std::deque<Action> actions_to_perform_;

RecorderSettings complete_settings_;

std::string hostname_;
};
}
}
58 changes: 33 additions & 25 deletions app/rec/rec_tests/rec_rpc_tests/src/external_ecal_rec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ ExternalEcalRecInstance::ExternalEcalRecInstance(bool gui)
eCAL::Initialize("Ecal Rec Tester");

remote_rec_server_service = std::make_shared<eCAL::protobuf::CServiceClient<eCAL::pb::rec_server::EcalRecServerService>>();
remote_rec_server_service->SetHostName(eCAL::Process::GetHostName());

if (gui)
{
Expand Down Expand Up @@ -119,17 +118,23 @@ eCAL::rec::Error ExternalEcalRecInstance::GetConfigViaRpc(eCAL::rec_server::RecS

eCAL::rec::Error ExternalEcalRecInstance::GetConfigViaRpc(eCAL::pb::rec_server::RecServerConfig& config_pb_output)
{
eCAL::pb::rec_server::GenericRequest request;
eCAL::v5::ServiceResponseVecT service_response_vec;
const auto& hostname(eCAL::Process::GetHostName());
constexpr int timeout_ms(1000);

constexpr int timeout_ms = 1000;

if (remote_rec_server_service->Call("GetConfig", request.SerializeAsString(), timeout_ms, &service_response_vec))
auto client_instances = remote_rec_server_service->GetClientInstances();
for (auto& client_instance : client_instances)
{
if (service_response_vec.size() > 0)
// TODO: We need to filter for pid as well in the future?
// hostname is fixed to the current host
if (client_instance.GetClientID().host_name == hostname)
{
config_pb_output.ParseFromString(service_response_vec[0].response);
return eCAL::rec::Error::ErrorCode::OK;
eCAL::pb::rec_server::GenericRequest request;
auto client_instance_reponse = client_instance.CallWithResponse("GetConfig", request, timeout_ms);
if (client_instance_reponse.first)
{
config_pb_output.ParseFromString(client_instance_reponse.second.response);
return eCAL::rec::Error::ErrorCode::OK;
}
}
}
return eCAL::rec::Error(eCAL::rec::Error::ErrorCode::REMOTE_HOST_UNAVAILABLE);
Expand All @@ -143,28 +148,31 @@ eCAL::rec::Error ExternalEcalRecInstance::SetConfigViaRpc(const eCAL::rec_server

eCAL::rec::Error ExternalEcalRecInstance::SetConfigViaRpc(const eCAL::pb::rec_server::RecServerConfig& config_pb)
{
eCAL::v5::ServiceResponseVecT service_response_vec;

constexpr int timeout_ms = 1000;
const auto& hostname (eCAL::Process::GetHostName());
constexpr int timeout_ms (1000);

if (remote_rec_server_service->Call("SetConfig", config_pb.SerializeAsString(), timeout_ms, &service_response_vec))
auto client_instances = remote_rec_server_service->GetClientInstances();
for (auto& client_instance : client_instances)
{
if (service_response_vec.size() > 0)
// TODO: We need to filter for pid as well in the future?
// hostname is fixed to the current host
if (client_instance.GetClientID().host_name == hostname)
{
eCAL::pb::rec_server::ServiceResult response_pb;

response_pb.ParseFromString(service_response_vec[0].response);

if (response_pb.error_code() != eCAL::pb::rec_server::ServiceResult_ErrorCode_no_error)
auto client_instance_reponse = client_instance.CallWithResponse("SetConfig", config_pb, timeout_ms);
if (client_instance_reponse.first)
{
return eCAL::rec::Error(eCAL::rec::Error::ErrorCode::GENERIC_ERROR, response_pb.info_message());
}
else
{
return eCAL::rec::Error::ErrorCode::OK;
eCAL::pb::rec_server::ServiceResult response_pb;
response_pb.ParseFromString(client_instance_reponse.second.response);
if (response_pb.error_code() != eCAL::pb::rec_server::ServiceResult_ErrorCode_no_error)
{
return eCAL::rec::Error(eCAL::rec::Error::ErrorCode::GENERIC_ERROR, response_pb.info_message());
}
else
{
return eCAL::rec::Error::ErrorCode::OK;
}
}
}
}

return eCAL::rec::Error(eCAL::rec::Error::ErrorCode::REMOTE_HOST_UNAVAILABLE);
}
18 changes: 11 additions & 7 deletions app/sys/sys_cli/src/commands/command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,21 @@ namespace eCAL
, const google::protobuf::Message& request
, google::protobuf::Message& response)
{
remote_ecalsys_service->SetHostName(hostname);

eCAL::v5::ServiceResponseVecT service_response_vec;
constexpr int timeout_ms = 1000;

if (remote_ecalsys_service->Call(method_name, request.SerializeAsString(), timeout_ms, &service_response_vec))
auto client_instances = remote_ecalsys_service->GetClientInstances();
for (auto& client_instance : client_instances)
{
if (service_response_vec.size() > 0)
// TODO: We need to filter for pid as well in the future?
// Currently empty hostname means "all hosts"
if (client_instance.GetClientID().host_name == hostname || hostname.empty())
{
response.ParseFromString(service_response_vec[0].response);
return eCAL::sys::Error::ErrorCode::OK;
auto client_instance_response = client_instance.CallWithResponse(method_name, request, timeout_ms);
if (client_instance_response.first)
{
response.ParseFromString(client_instance_response.second.response);
return eCAL::sys::Error::ErrorCode::OK;
}
}
}
return eCAL::sys::Error(eCAL::sys::Error::ErrorCode::REMOTE_HOST_UNAVAILABLE, hostname);
Expand Down
24 changes: 14 additions & 10 deletions app/sys/sys_core/src/connection/remote_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ namespace eCAL
RemoteConnection::RemoteConnection(const std::string& hostname)
: AbstractConnection(hostname)
{
sys_client_service_.SetHostName(hostname);
}

RemoteConnection::~RemoteConnection()
Expand Down Expand Up @@ -127,9 +126,6 @@ namespace eCAL
{
std::lock_guard<decltype(connection_mutex_)> connection_lock(connection_mutex_);

eCAL::v5::ServiceResponseVecT service_response_vec;
constexpr int timeout_ms = 1000;

// After client creation it takes some time for the client to be actually connected.
// As the call and the creation is too close together, the first call will fail.
// Here we wait until the connection is established.
Expand All @@ -148,16 +144,24 @@ namespace eCAL
}
}

if (sys_client_service_.Call(method_name, request.SerializeAsString(), timeout_ms, &service_response_vec))
constexpr int timeout_ms = 1000;

auto client_instances = sys_client_service_.GetClientInstances();
for (auto& client_instance : client_instances)
{
if (service_response_vec.size() > 0)
// TODO: We need to filter for pid as well in the future?
// Currently empty hostname means "all hosts"
if (client_instance.GetClientID().host_name == m_hostname || m_hostname.empty())
{
response.ParseFromString(service_response_vec[0].response);
return true;
auto client_instance_response = client_instance.CallWithResponse(method_name, request, timeout_ms);
if (client_instance_response.first)
{
response.ParseFromString(client_instance_response.second.response);
return true;
}
}
}
return false;
}

}
}
}
3 changes: 3 additions & 0 deletions ecal/core/src/v5/service/ecal_service_client_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ namespace eCAL
bool success = false;
for (auto& instance : instances)
{
// Currently empty hostname means "all hosts"
if (instance.GetClientID().host_name == m_host_name || m_host_name.empty())
{
success |= instance.CallWithCallback(method_name_, request_, callback, timeout_);
Expand Down Expand Up @@ -231,6 +232,7 @@ namespace eCAL
bool success = false;
for (auto& instance : instances)
{
// Currently empty hostname means "all hosts"
if (instance.GetClientID().host_name == m_host_name || m_host_name.empty())
{
responses.emplace_back(instance.CallWithResponse(method_name_, request_, timeout_));
Expand Down Expand Up @@ -277,6 +279,7 @@ namespace eCAL
bool success = false;
for (auto& instance : instances)
{
// Currently empty hostname means "all hosts"
if (instance.GetClientID().host_name == m_host_name || m_host_name.empty())
{
success |= instance.CallWithCallbackAsync(method_name_, request_, callback);
Expand Down
11 changes: 5 additions & 6 deletions ecal/samples/cpp/orchestration/orchestrator/src/orchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ int main()
// initialize eCAL API
eCAL::Initialize("orchestrator");

eCAL::protobuf::CServiceClient<orchestrator::ComponentService> component1("component1");
eCAL::protobuf::CServiceClient<orchestrator::ComponentService> component2("component2");
const eCAL::protobuf::CServiceClientTyped<orchestrator::ComponentService> component1("component1");
const eCAL::protobuf::CServiceClientTyped<orchestrator::ComponentService> component2("component2");

// sleep 2 seconds
eCAL::Process::SleepMS(2000);

// prepare service request and response vector
orchestrator::request srv_request;
eCAL::v5::ServiceResponseVecT srv_response_vec;
orchestrator::request srv_request;

// call components 1 and 2
uint64_t cycle = 0;
Expand All @@ -46,10 +45,10 @@ int main()
srv_request.set_id(cycle);

std::cout << "call component 1" << std::endl;
component1.Call("execute", srv_request, -1, &srv_response_vec);
component1.CallWithResponse<orchestrator::request, orchestrator::response>("execute", srv_request);

std::cout << "call component 2" << std::endl << std::endl;
component2.Call("execute", srv_request, -1, &srv_response_vec);
component2.CallWithResponse<orchestrator::request, orchestrator::response>("execute", srv_request);

++cycle;
}
Expand Down
6 changes: 3 additions & 3 deletions lang/python/core/src/ecal_wrap.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ namespace
PyDict_SetItemString(topicDict, "tdatatype_descriptor", val); Py_DECREF(val);

PyObject* layerList = PyList_New(0);
for (auto layer : topic.transport_layer)
for (const auto& layer : topic.transport_layer)
{
PyObject* layerDict = PyDict_New();
PyList_Append(layerList, layerDict); Py_DECREF(layerDict);
Expand Down Expand Up @@ -1144,7 +1144,7 @@ PyObject* mon_monitoring(PyObject* /*self*/, PyObject* /*args*/)
PyObject* methodsDict = PyDict_New();
PyDict_SetItemString(serviceDict, "methods", methodsDict); Py_DECREF(methodsDict);

for (const auto method : service.methods)
for (const auto& method : service.methods)
{
val = Py_BuildValue("s", method.method_name.c_str());
PyDict_SetItemString(methodsDict, "mname", val); Py_DECREF(val);
Expand Down Expand Up @@ -1207,7 +1207,7 @@ PyObject* mon_monitoring(PyObject* /*self*/, PyObject* /*args*/)
PyObject* methodsDict = PyDict_New();
PyDict_SetItemString(clientDict, "methods", methodsDict); Py_DECREF(methodsDict);

for (const auto method : client.methods)
for (const auto& method : client.methods)
{
val = Py_BuildValue("s", method.method_name.c_str());
PyDict_SetItemString(methodsDict, "mname", val); Py_DECREF(val);
Expand Down
Loading

0 comments on commit db3f4f1

Please sign in to comment.