Skip to content

Commit

Permalink
enh(common): grpc server/client moved to common and new otlp message …
Browse files Browse the repository at this point in the history
…for bbdo.

REFS: MON-34002
  • Loading branch information
jean-christophe81 authored May 28, 2024
1 parent 974ca8b commit 85e3674
Show file tree
Hide file tree
Showing 39 changed files with 796 additions and 239 deletions.
3 changes: 3 additions & 0 deletions .github/scripts/collect-test-robot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ echo '/tmp/core.%p' > /proc/sys/kernel/core_pattern
#remove git dubious ownership
/usr/bin/git config --global --add safe.directory $PWD

echo "###### git clone opentelemetry-proto #######"
git clone --depth=1 --single-branch https://github.com/open-telemetry/opentelemetry-proto.git opentelemetry-proto

echo "##### Starting tests #####"
cd tests
./init-proto.sh
Expand Down
18 changes: 18 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,24 @@ set(OTLP_LIB_DIR ${opentelemetry-cpp_DIR}/../../lib)
set(VCPKG_INCLUDE_DIR ${Protobuf_INCLUDE_DIR})
include(GNUInstallDirs)

#import opentelemetry-proto
add_custom_command(
OUTPUT ${CMAKE_SOURCE_DIR}/opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto
${CMAKE_SOURCE_DIR}/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto
${CMAKE_SOURCE_DIR}/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto
${CMAKE_SOURCE_DIR}/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto
COMMENT "get opentelemetry proto files from git repository"
COMMAND /bin/rm -rf ${CMAKE_SOURCE_DIR}/opentelemetry-proto
COMMAND git ARGS clone --depth=1 --single-branch https://github.com/open-telemetry/opentelemetry-proto.git ${CMAKE_SOURCE_DIR}/opentelemetry-proto
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
)

add_custom_target(opentelemetry-proto-files DEPENDS ${CMAKE_SOURCE_DIR}/opentelemetry-proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto
${CMAKE_SOURCE_DIR}/opentelemetry-proto/opentelemetry/proto/metrics/v1/metrics.proto
${CMAKE_SOURCE_DIR}/opentelemetry-proto/opentelemetry/proto/common/v1/common.proto
${CMAKE_SOURCE_DIR}/opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto
)

# var directories.
set(BROKER_VAR_LOG_DIR
"${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/log/centreon-broker")
Expand Down
32 changes: 32 additions & 0 deletions bbdo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,38 @@ add_dependencies(pb_remove_graph_message_lib target_remove_graph_message
set_target_properties(pb_remove_graph_message_lib
PROPERTIES POSITION_INDEPENDENT_CODE ON)

set(otl_protobuf_files
opentelemetry/proto/collector/metrics/v1/metrics_service
opentelemetry/proto/metrics/v1/metrics
opentelemetry/proto/common/v1/common
opentelemetry/proto/resource/v1/resource
)
foreach(name IN LISTS otl_protobuf_files)
set(proto_file "${name}.proto")
add_custom_command(
OUTPUT "${CMAKE_SOURCE_DIR}/bbdo/${name}.pb.cc"
COMMENT "Generating interface files of the otl file ${proto_file}"
#DEPENDS ${CMAKE_BINARY_DIR}/opentelemetry-proto/${proto_file}
DEPENDS opentelemetry-proto-files
COMMAND
${Protobuf_PROTOC_EXECUTABLE} ARGS --cpp_out=${CMAKE_SOURCE_DIR}/bbdo
--proto_path=${CMAKE_SOURCE_DIR}/opentelemetry-proto ${proto_file}
VERBATIM)
endforeach()

add_library(pb_open_telemetry_lib STATIC
${CMAKE_SOURCE_DIR}/bbdo/opentelemetry/proto/collector/metrics/v1/metrics_service.pb.cc
${CMAKE_SOURCE_DIR}/bbdo/opentelemetry/proto/metrics/v1/metrics.pb.cc
${CMAKE_SOURCE_DIR}/bbdo/opentelemetry/proto/common/v1/common.pb.cc
${CMAKE_SOURCE_DIR}/bbdo/opentelemetry/proto/resource/v1/resource.pb.cc
)

target_include_directories(pb_open_telemetry_lib BEFORE PRIVATE ${CMAKE_SOURCE_DIR}/bbdo)


set_target_properties(pb_open_telemetry_lib
PROPERTIES POSITION_INDEPENDENT_CODE ON)

macro(get_protobuf_files name)
set_source_files_properties("${CMAKE_SOURCE_DIR}/bbdo/${name}.pb.cc"
PROPERTIES GENERATED TRUE)
Expand Down
3 changes: 3 additions & 0 deletions bbdo/events.hh
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ enum data_element {
de_pb_status = 10,
de_pb_index_mapping = 11,
de_pb_metric_mapping = 12,
de_pb_otl_metrics =
13 // contain an
// ::opentelemetry::proto::collector::metrics::v1::ExportMetricsServiceRequest
};
}
namespace bam {
Expand Down
18 changes: 11 additions & 7 deletions broker/bam/test/monitoring_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
#include "com/centreon/broker/config/applier/init.hh"
#include "com/centreon/broker/multiplexing/engine.hh"
#include "com/centreon/broker/neb/acknowledgement.hh"
#include "common/log_v2/log_v2.hh"

using log_v2 = com::centreon::common::log_v2::log_v2;
using namespace com::centreon::broker;
using namespace com::centreon::broker::bam;

class BamMonitoringStream : public testing::Test {
void SetUp() override {
config::applier::init(0, "test_broker", 0);
}
void SetUp() override { config::applier::init(0, "test_broker", 0); }
void TearDown() override { config::applier::deinit(); }
};

Expand All @@ -44,7 +44,8 @@ TEST_F(BamMonitoringStream, WriteKpi) {
std::shared_ptr<persistent_cache> cache;
std::unique_ptr<monitoring_stream> ms;

ASSERT_NO_THROW(ms.reset(new monitoring_stream("", cfg, storage, cache)));
ASSERT_NO_THROW(ms.reset(new monitoring_stream(
"", cfg, storage, cache, log_v2::instance().get(log_v2::BAM))));

std::shared_ptr<pb_kpi_status> st{std::make_shared<pb_kpi_status>()};
st->mut_obj().set_kpi_id(1);
Expand All @@ -61,7 +62,8 @@ TEST_F(BamMonitoringStream, WriteBA) {
std::shared_ptr<persistent_cache> cache;
std::unique_ptr<monitoring_stream> ms;

ASSERT_NO_THROW(ms.reset(new monitoring_stream("", cfg, storage, cache)));
ASSERT_NO_THROW(ms.reset(new monitoring_stream(
"", cfg, storage, cache, log_v2::instance().get(log_v2::BAM))));

std::shared_ptr<ba_status> st{std::make_shared<ba_status>(ba_status())};

Expand All @@ -77,7 +79,8 @@ TEST_F(BamMonitoringStream, WorkWithNoPendigMysqlRequest) {
std::shared_ptr<persistent_cache> cache;
std::unique_ptr<monitoring_stream> ms;

ASSERT_NO_THROW(ms.reset(new monitoring_stream("", cfg, storage, cache)));
ASSERT_NO_THROW(ms.reset(new monitoring_stream(
"", cfg, storage, cache, log_v2::instance().get(log_v2::BAM))));

std::shared_ptr<ba_status> st{std::make_shared<ba_status>(ba_status())};

Expand All @@ -98,7 +101,8 @@ TEST_F(BamMonitoringStream, WorkWithPendigMysqlRequest) {
std::shared_ptr<persistent_cache> cache;
std::unique_ptr<monitoring_stream> ms;

ASSERT_NO_THROW(ms.reset(new monitoring_stream("", cfg, storage, cache)));
ASSERT_NO_THROW(ms.reset(new monitoring_stream(
"", cfg, storage, cache, log_v2::instance().get(log_v2::BAM))));

std::shared_ptr<ba_status> st{std::make_shared<ba_status>(ba_status())};

Expand Down
29 changes: 17 additions & 12 deletions broker/core/test/mysql/mysql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

#include "com/centreon/broker/config/applier/init.hh"
#include "com/centreon/broker/config/applier/modules.hh"
#include "com/centreon/broker/log_v2.hh"
#include "com/centreon/broker/neb/custom_variable.hh"
#include "com/centreon/broker/neb/downtime.hh"
#include "com/centreon/broker/neb/host.hh"
Expand All @@ -44,10 +43,12 @@
#include "com/centreon/broker/sql/mysql_multi_insert.hh"
#include "com/centreon/broker/sql/query_preparator.hh"
#include "com/centreon/exceptions/msg_fmt.hh"
#include "common/log_v2/log_v2.hh"

using msg_fmt = com::centreon::exceptions::msg_fmt;
using namespace com::centreon::broker;
using namespace com::centreon::broker::database;
using log_v2 = com::centreon::common::log_v2::log_v2;

class DatabaseStorageTest : public ::testing::Test {
public:
Expand Down Expand Up @@ -503,7 +504,7 @@ TEST_F(DatabaseStorageTest, ConnectionOk) {
// }
//
TEST_F(DatabaseStorageTest, CustomVarStatement) {
config::applier::modules modules;
config::applier::modules modules(log_v2::instance().get(log_v2::SQL));
modules.load_file("./broker/neb/10-neb.so");
database_config db_cfg("MySQL", "127.0.0.1", MYSQL_SOCKET, 3306, "root",
"centreon", "centreon_storage", 5, true, 5);
Expand Down Expand Up @@ -2078,7 +2079,7 @@ TEST_F(DatabaseStorageTest, MySqlMultiInsert) {
ms->commit();
std::chrono::system_clock::time_point end_insert =
std::chrono::system_clock::now();
SPDLOG_LOGGER_INFO(log_v2::sql(),
SPDLOG_LOGGER_INFO(log_v2::instance().get(log_v2::SQL),
" insert {} rows in {} requests duration: {} seconds",
data_index, nb_request,
std::chrono::duration_cast<std::chrono::seconds>(
Expand Down Expand Up @@ -2128,7 +2129,7 @@ TEST_F(DatabaseStorageTest, MySqlMultiInsert) {
nb_request = inserter2.execute_queries(*ms);
ms->commit();
end_insert = std::chrono::system_clock::now();
SPDLOG_LOGGER_INFO(log_v2::sql(),
SPDLOG_LOGGER_INFO(log_v2::instance().get(log_v2::SQL),
" insert {} rows in {} requests duration: {} seconds",
data_index, nb_request,
std::chrono::duration_cast<std::chrono::seconds>(
Expand Down Expand Up @@ -2218,10 +2219,12 @@ TEST_F(DatabaseStorageTest, bulk_or_multi_bbdo_event_bulk) {
inserter->execute(*ms);
ms->commit();

log_v2::sql()->info("100000 rows inserted in {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - begin)
.count());
log_v2::instance()
.get(log_v2::SQL)
->info("100000 rows inserted in {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - begin)
.count());

std::promise<mysql_result> select_prom;
std::future<mysql_result> select_fut = select_prom.get_future();
Expand Down Expand Up @@ -2269,10 +2272,12 @@ TEST_F(DatabaseStorageTest, bulk_or_multi_bbdo_event_multi) {
inserter->execute(*ms);
ms->commit();

log_v2::sql()->info("100000 rows inserted in {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - begin)
.count());
log_v2::instance()
.get(log_v2::SQL)
->info("100000 rows inserted in {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now() - begin)
.count());

std::promise<mysql_result> select_prom;
std::future<mysql_result> select_fut = select_prom.get_future();
Expand Down
10 changes: 9 additions & 1 deletion broker/grpc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ set(MODULE_DIR "${PROJECT_SOURCE_DIR}/grpc")
set(INC_DIR "${MODULE_DIR}/inc/com/centreon/broker/grpc")
set(SRC_DIR "${MODULE_DIR}/src")
set(TEST_DIR "${MODULE_DIR}/test")
include_directories(${MODULE_DIR}/inc ${SRC_DIR} ${CMAKE_SOURCE_DIR}/bbdo ${CMAKE_SOURCE_DIR}/common)
include_directories(${MODULE_DIR}/inc
${SRC_DIR}
${CMAKE_SOURCE_DIR}/bbdo
${CMAKE_SOURCE_DIR}/common
${CMAKE_SOURCE_DIR}/common/grpc/inc)

# Sources.
set(SOURCES
Expand All @@ -43,6 +47,7 @@ add_library(${GRPC} SHARED ${SOURCES} )
set_target_properties(${GRPC} PROPERTIES PREFIX "")
target_link_libraries(
${GRPC}
centreon_grpc
"-Wl,--whole-archive"
pb_neb_lib
pb_storage_lib
Expand All @@ -51,6 +56,7 @@ target_link_libraries(
pb_tag_lib
pb_bam_lib
pb_extcmd_lib
pb_open_telemetry_lib
pb_rebuild_message_lib
pb_remove_graph_message_lib
pb_header_lib
Expand Down Expand Up @@ -81,6 +87,7 @@ add_custom_command(
COMMAND
${Protobuf_PROTOC_EXECUTABLE} ARGS --cpp_out=${SRC_DIR}
--proto_path=${MODULE_DIR} --proto_path=${CMAKE_SOURCE_DIR}/bbdo
--proto_path=${CMAKE_SOURCE_DIR}/opentelemetry-proto
grpc_stream.proto
VERBATIM)

Expand All @@ -92,6 +99,7 @@ add_custom_command(
${Protobuf_PROTOC_EXECUTABLE} ARGS --grpc_out=${SRC_DIR}
--plugin=protoc-gen-grpc=${GRPC_CPP_PLUGIN}
--proto_path=${MODULE_DIR} --proto_path=${CMAKE_SOURCE_DIR}/bbdo
--proto_path=${CMAKE_SOURCE_DIR}/opentelemetry-proto
grpc_stream.proto
VERBATIM
WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
Expand Down
29 changes: 29 additions & 0 deletions broker/grpc/generate_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

file_begin_content = """syntax = "proto3";
import "opentelemetry/proto/collector/metrics/v1/metrics_service.proto";
"""

file_message_centreon_event = """
Expand Down Expand Up @@ -218,6 +219,34 @@ class received_protobuf : public io::protobuf<T, Typ> {
"""

#The following message is not in bbdo protobuff files so we need to add manually.

file_message_centreon_event += f" opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest ExportMetricsServiceRequest_ = {one_of_index};\n"

cc_file_protobuf_to_event_function += """
case ::stream::CentreonEvent::kExportMetricsServiceRequest:
return std::make_shared<detail::received_protobuf<
::opentelemetry::proto::collector::metrics::v1::
ExportMetricsServiceRequest,
make_type(io::storage, storage::de_pb_otl_metrics)>>(
stream_content, &grpc_event_type::exportmetricsservicerequest_,
&grpc_event_type::mutable_exportmetricsservicerequest_);
"""

cc_file_create_event_with_data_function += """
case make_type(io::storage, storage::de_pb_otl_metrics):
ret = std::make_shared<event_with_data>(
event, reinterpret_cast<event_with_data::releaser_type>(
&grpc_event_type::release_exportmetricsservicerequest_));
ret->grpc_event.set_allocated_exportmetricsservicerequest_(
&std::static_pointer_cast<io::protobuf<
::opentelemetry::proto::collector::metrics::v1::
ExportMetricsServiceRequest,
make_type(io::storage, storage::de_pb_otl_metrics)>>(event)
->mut_obj());
break;
"""

with open(args.proto_file, 'w', encoding="utf-8") as fp:
fp.write(file_begin_content)
fp.write("""
Expand Down
5 changes: 3 additions & 2 deletions broker/grpc/inc/com/centreon/broker/grpc/acceptor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define CCB_GRPC_ACCEPTOR_HH

#include "com/centreon/broker/io/endpoint.hh"
#include "com/centreon/common/grpc/grpc_server.hh"
#include "grpc_config.hh"

namespace com::centreon::broker::grpc {
Expand Down Expand Up @@ -60,9 +61,9 @@ class service_impl
void unregister(const std::shared_ptr<io::stream>& to_unregister);
};

class acceptor : public io::endpoint {
class acceptor : public io::endpoint,
public com::centreon::common::grpc::grpc_server_base {
std::shared_ptr<service_impl> _service;
std::unique_ptr<::grpc::Server> _server;

public:
acceptor(const grpc_config::pointer& conf);
Expand Down
6 changes: 3 additions & 3 deletions broker/grpc/inc/com/centreon/broker/grpc/connector.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
#define CCB_GRPC_CONNECTOR_HH

#include "com/centreon/broker/io/limit_endpoint.hh"
#include "com/centreon/common/grpc/grpc_client.hh"
#include "grpc_config.hh"

namespace com::centreon::broker {

namespace grpc {
class connector : public io::limit_endpoint {
grpc_config::pointer _conf;
std::shared_ptr<::grpc::Channel> _channel;
class connector : public io::limit_endpoint,
public com::centreon::common::grpc::grpc_client_base {
std::unique_ptr<com::centreon::broker::stream::centreon_bbdo::Stub> _stub;

public:
Expand Down
Loading

3 comments on commit 85e3674

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
21 1 0 22 95.45 17m57.687660999s

Failed Tests

Name Message ⏱️ Duration Suite
not16 Service (host_1,service_1) should be OK HARD 91.525 s Notifications

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
13 1 0 14 92.86 4m20.226154s

Failed Tests

Name Message ⏱️ Duration Suite
BEUTAG2 New service should have a service group tag of id 4. 85.433 s Tags

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
31 1 0 32 96.88 10m23.538037s

Failed Tests

Name Message ⏱️ Duration Suite
BEEXTCMD_REVERSE_GRPC2 Engine badly stopped with 1 instances - code returned -11. 34.985 s External-Commands2

Please sign in to comment.