Skip to content

Commit

Permalink
enh(engine/otlp): new otlp gRPC server
Browse files Browse the repository at this point in the history
* gRPC fix.

REFS: MON-34074
  • Loading branch information
jean-christophe81 authored May 31, 2024
1 parent 54634ba commit 97a132a
Show file tree
Hide file tree
Showing 51 changed files with 2,559 additions and 272 deletions.
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,9 @@ find_package(spdlog CONFIG REQUIRED)
find_package(gRPC CONFIG REQUIRED)
find_package(Protobuf REQUIRED)
find_package(nlohmann_json CONFIG REQUIRED)
find_package(opentelemetry-cpp REQUIRED)
find_package(GTest CONFIG REQUIRED)
find_package(CURL REQUIRED)
find_package(Boost REQUIRED)
find_package(Boost REQUIRED COMPONENTS url)
find_package(ryml CONFIG REQUIRED)
add_definitions("-DSPDLOG_FMT_EXTERNAL")

Expand Down
11 changes: 4 additions & 7 deletions broker/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ add_custom_target("target_broker_message" DEPENDS "${SRC_DIR}/broker.pb.cc"
"${SRC_DIR}/broker.pb.h")

include_directories(${SRC_DIR} ${CMAKE_SOURCE_DIR}/common/src
${CMAKE_SOURCE_DIR}/common/inc)
${CMAKE_SOURCE_DIR}/common/inc
${CMAKE_SOURCE_DIR}/bbdo)

add_library(berpc STATIC ${SRC_DIR}/broker.grpc.pb.cc ${SRC_DIR}/broker.pb.cc
${SRC_DIR}/broker.grpc.pb.h ${SRC_DIR}/broker.pb.h)
Expand Down Expand Up @@ -446,6 +447,7 @@ target_link_libraries(
bbdo_bbdo
pb_bbdo_lib
pb_extcmd_lib
pb_open_telemetry_lib
berpc
z
spdlog::spdlog
Expand All @@ -460,10 +462,6 @@ target_link_libraries(
rokerbase
crypto
ssl
gRPC::gpr
gRPC::grpc
gRPC::grpc++
gRPC::grpc++_alts
pthread
dl)

Expand Down Expand Up @@ -494,7 +492,6 @@ target_link_libraries(
# spdlog::spdlog
gRPC::gpr
gRPC::grpc
gRPC::grpc++
gRPC::grpc++_alts)

# Centreon Broker Watchdog
Expand All @@ -513,7 +510,7 @@ add_subdirectory(core/sql)
# Generator module.
add_broker_module(GENERATOR OFF)
add_broker_module(STATS ON)
add_broker_module(STATS_EXPORTER ON)
#add_broker_module(STATS_EXPORTER OFF)
add_broker_module(NEB ON)
add_broker_module(RRD ON)
add_broker_module(UNIFIED_SQL ON)
Expand Down
1 change: 1 addition & 0 deletions broker/bam/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ target_link_libraries("${BAM}" bbdo_storage bbdo_bam pb_bam_lib
spdlog::spdlog)
target_precompile_headers(${BAM} PRIVATE precomp_inc/precomp.hpp)
set_target_properties("${BAM}" PROPERTIES PREFIX "")
add_dependencies("${BAM}" pb_open_telemetry_lib)

# Testing.
if(WITH_TESTING)
Expand Down
17 changes: 15 additions & 2 deletions broker/grpc/inc/com/centreon/broker/grpc/acceptor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace com::centreon::broker::grpc {
*
*/
class service_impl
: public com::centreon::broker::stream::centreon_bbdo::CallbackService,
: public com::centreon::broker::stream::centreon_bbdo::Service,
public std::enable_shared_from_this<service_impl> {
grpc_config::pointer _conf;

Expand All @@ -46,9 +46,22 @@ class service_impl
public:
service_impl(const grpc_config::pointer& conf);

void init();

// disable synchronous version of this method
::grpc::Status exchange(
::grpc::ServerContext* /*context*/,
::grpc::ServerReaderWriter<
::com::centreon::broker::stream::CentreonEvent,
::com::centreon::broker::stream::CentreonEvent>* /*stream*/)
override {
abort();
return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "");
}

::grpc::ServerBidiReactor<::com::centreon::broker::stream::CentreonEvent,
::com::centreon::broker::stream::CentreonEvent>*
exchange(::grpc::CallbackServerContext* context) override;
exchange(::grpc::CallbackServerContext* context);

const grpc_config::pointer& get_conf() const { return _conf; }

Expand Down
8 changes: 0 additions & 8 deletions broker/grpc/inc/com/centreon/broker/grpc/stream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,6 @@ std::ostream& operator<<(std::ostream&, const CentreonEvent&);
namespace grpc {

extern const std::string authorization_header;
constexpr uint32_t calc_accept_all_compression_mask() {
uint32_t ret = 0;
for (size_t algo_ind = 0; algo_ind < GRPC_COMPRESS_ALGORITHMS_COUNT;
algo_ind++) {
ret += (1u << algo_ind);
}
return ret;
}

struct detail_centreon_event;
std::ostream& operator<<(std::ostream&, const detail_centreon_event&);
Expand Down
18 changes: 17 additions & 1 deletion broker/grpc/src/acceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "com/centreon/broker/grpc/acceptor.hh"
#include "com/centreon/broker/grpc/stream.hh"
#include "com/centreon/common/pool.hh"
#include "common/log_v2/log_v2.hh"

using namespace com::centreon::broker;
Expand Down Expand Up @@ -95,6 +96,20 @@ void server_stream::OnDone() {
*/
service_impl::service_impl(const grpc_config::pointer& conf) : _conf(conf) {}

/**
* @brief to call after construction
*
*/
void service_impl::init() {
::grpc::Service::MarkMethodCallback(
0, new ::grpc::internal::CallbackBidiHandler<
::com::centreon::broker::stream::CentreonEvent,
::com::centreon::broker::stream::CentreonEvent>(
[me = shared_from_this()](::grpc::CallbackServerContext* context) {
return me->exchange(context);
}));
}

/**
* @brief called on every stream creation
* every accepted stream is pushed in _wait_to_open queue
Expand Down Expand Up @@ -136,7 +151,7 @@ service_impl::exchange(::grpc::CallbackServerContext* context) {
server_stream::register_stream(next_stream);
next_stream->start_read();
{
std::unique_lock l(_wait_m);
std::lock_guard l(_wait_m);
_wait_to_open.push_back(next_stream);
}
_wait_cond.notify_one();
Expand Down Expand Up @@ -249,6 +264,7 @@ acceptor::acceptor(const grpc_config::pointer& conf)
_init([this](::grpc::ServerBuilder& builder) {
_service = std::make_shared<service_impl>(
std::static_pointer_cast<grpc_config>(get_conf()));
_service->init();
builder.RegisterService(_service.get());
});
}
Expand Down
3 changes: 2 additions & 1 deletion broker/lua/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ add_dependencies(${LUA}
pb_neb_lib
pb_header_lib
pb_storage_lib
pb_bam_lib)
pb_bam_lib
pb_open_telemetry_lib)

target_link_libraries("${LUA}" ${LUA_LIBRARIES} crypto ssl bbdo_storage bbdo_bam spdlog::spdlog pb_storage_lib)
target_precompile_headers(${LUA} PRIVATE precomp_inc/precomp.hpp)
Expand Down
2 changes: 2 additions & 0 deletions broker/neb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ endif()
set_target_properties("${CBMOD}" PROPERTIES PREFIX "")
target_precompile_headers(${CBMOD} PRIVATE precomp_inc/precomp.hpp)

target_include_directories(${CBMOD} PRIVATE ${CMAKE_SOURCE_DIR}/bbdo)

# Testing.
if(WITH_TESTING)
set(TESTS_SOURCES
Expand Down
3 changes: 2 additions & 1 deletion broker/simu/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ add_dependencies(${SIMU}
pb_neb_lib
pb_header_lib
pb_storage_lib
pb_bam_lib)
pb_bam_lib
pb_open_telemetry_lib)

target_link_libraries("${SIMU}" ${LUA_LIBRARIES} spdlog::spdlog)
target_precompile_headers(${SIMU} PRIVATE precomp_inc/precomp.hpp)
Expand Down
1 change: 1 addition & 0 deletions broker/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ add_library("${SQL}" SHARED
add_dependencies(${SQL}
pb_neb_lib
pb_header_lib
pb_open_telemetry_lib
)

set_target_properties("${SQL}" PROPERTIES
Expand Down
1 change: 1 addition & 0 deletions broker/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ add_dependencies(conflictmgr
pb_neb_lib
pb_header_lib
pb_storage_lib
pb_open_telemetry_lib
)

set_target_properties(conflictmgr PROPERTIES COMPILE_FLAGS "-fPIC")
Expand Down
4 changes: 3 additions & 1 deletion broker/unified_sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ add_library(
set_target_properties("${UNIFIED_SQL}" PROPERTIES PREFIX "" COMPILE_FLAGS
"-fPIC")
add_dependencies("${UNIFIED_SQL}" target_rebuild_message
target_remove_graph_message target_neb)
target_remove_graph_message
target_neb
pb_open_telemetry_lib)
target_precompile_headers(${UNIFIED_SQL} PRIVATE precomp_inc/precomp.hpp)
target_link_libraries(
${UNIFIED_SQL}
Expand Down
37 changes: 25 additions & 12 deletions common/grpc/inc/com/centreon/common/grpc/grpc_config.hh
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,24 @@ class grpc_config {
* server case: address/port to listen
*
*/
const std::string _hostport;
const bool _crypted = false;
const std::string _certificate, _cert_key, _ca_cert;
const std::string _ca_name;
const bool _compress;
const int _second_keepalive_interval;
std::string _hostport;
bool _crypted = false;
std::string _certificate, _cert_key, _ca_cert;
std::string _ca_name;
bool _compress;
int _second_keepalive_interval;

public:
using pointer = std::shared_ptr<grpc_config>;

grpc_config() : _compress(false), _second_keepalive_interval(30) {}
grpc_config(const std::string& hostp)
: _hostport(hostp), _compress(false), _second_keepalive_interval(30) {}
grpc_config(const std::string& hostp, bool crypted)
: _hostport(hostp),
_crypted(crypted),
_compress(false),
_second_keepalive_interval(30) {}
grpc_config(const std::string& hostp,
bool crypted,
const std::string& certificate,
Expand All @@ -78,16 +83,24 @@ class grpc_config {
_compress(compression),
_second_keepalive_interval(second_keepalive_interval) {}

constexpr const std::string& get_hostport() const { return _hostport; }
constexpr bool is_crypted() const { return _crypted; }
constexpr const std::string& get_cert() const { return _certificate; }
constexpr const std::string& get_key() const { return _cert_key; }
constexpr const std::string& get_ca() const { return _ca_cert; }
const std::string& get_hostport() const { return _hostport; }
bool is_crypted() const { return _crypted; }
const std::string& get_cert() const { return _certificate; }
const std::string& get_key() const { return _cert_key; }
const std::string& get_ca() const { return _ca_cert; }
const std::string& get_ca_name() const { return _ca_name; }
constexpr bool is_compressed() const { return _compress; }
bool is_compressed() const { return _compress; }
int get_second_keepalive_interval() const {
return _second_keepalive_interval;
}

bool operator==(const grpc_config& right) const {
return _hostport == right._hostport && _crypted == right._crypted &&
_certificate == right._certificate && _cert_key == right._cert_key &&
_ca_cert == right._ca_cert && _ca_name == right._ca_name &&
_compress == right._compress &&
_second_keepalive_interval == right._second_keepalive_interval;
}
};
} // namespace com::centreon::common::grpc

Expand Down
2 changes: 2 additions & 0 deletions common/grpc/inc/com/centreon/common/grpc/grpc_server.hh
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class grpc_server_base {

virtual ~grpc_server_base();

void shutdown(const std::chrono::system_clock::duration& timeout);

grpc_server_base(const grpc_server_base&) = delete;
grpc_server_base& operator=(const grpc_server_base&) = delete;

Expand Down
27 changes: 20 additions & 7 deletions common/grpc/src/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,25 @@ void grpc_server_base::_init(const builder_option& options) {
*
*/
grpc_server_base::~grpc_server_base() {
if (_server) {
SPDLOG_LOGGER_DEBUG(_logger, "begin shutdown of grpc server {} ",
_conf->get_hostport());
_server->Shutdown(std::chrono::system_clock::now() +
std::chrono::seconds(15));
SPDLOG_LOGGER_DEBUG(_logger, "end shutdown of grpc server {} ",
_conf->get_hostport());
shutdown(std::chrono::seconds(15));
}

/**
* @brief shutdown server
*
* @param timeout after this timeout, grpc server will be stopped
*/
void grpc_server_base::shutdown(
const std::chrono::system_clock::duration& timeout) {
std::unique_ptr<::grpc::Server> to_shutdown;
if (!_server) {
return;
}
to_shutdown = std::move(_server);
SPDLOG_LOGGER_INFO(_logger, "{:p} begin shutdown of grpc server {}",
static_cast<const void*>(this), _conf->get_hostport());
to_shutdown->Shutdown(std::chrono::system_clock::now() + timeout);
to_shutdown->Wait();
SPDLOG_LOGGER_INFO(_logger, "{:p} end shutdown of grpc server {}",
static_cast<const void*>(this), _conf->get_hostport());
}
13 changes: 11 additions & 2 deletions common/http/inc/com/centreon/common/http/http_config.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

namespace com::centreon::common::http {

using system_clock = std::chrono::system_clock;
using time_point = system_clock::time_point;
using duration = system_clock::duration;

/**
* @brief this class is a bean that contains all config parameters
*
Expand All @@ -42,6 +46,8 @@ class http_config {
asio::ssl::context_base::method _ssl_method;
// path of certificate file
std::string _certificate_path;
// path to key file (server case)
std::string _key_path;

public:
using pointer = std::shared_ptr<http_config>;
Expand All @@ -59,7 +65,8 @@ class http_config {
unsigned max_connections = 10,
asio::ssl::context_base::method ssl_method =
asio::ssl::context_base::tlsv13_client,
const std::string& certificate_path = "")
const std::string& certificate_path = "",
const std::string& key_path = "")
: _endpoint(endpoint),
_server_name(server_name),
_crypted(crypted),
Expand All @@ -72,7 +79,8 @@ class http_config {
_default_http_keepalive_duration(default_http_keepalive_duration),
_max_connections(max_connections),
_ssl_method(ssl_method),
_certificate_path(certificate_path) {}
_certificate_path(certificate_path),
_key_path(key_path) {}

http_config()
: _crypted(false),
Expand All @@ -97,6 +105,7 @@ class http_config {
unsigned get_max_connections() const { return _max_connections; }
asio::ssl::context_base::method get_ssl_method() const { return _ssl_method; }
const std::string& get_certificate_path() const { return _certificate_path; }
const std::string& get_key_path() const { return _key_path; }
};

} // namespace com::centreon::common::http
Expand Down
Loading

5 comments on commit 97a132a

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
21 1 0 22 95.45 17m37.541209s

Failed Tests

Name Message ⏱️ Duration Suite
not16 Service (host_3,service_3) should be OK HARD 76.376 s Notifications

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
8 2 0 10 80.00 5m13.880662s

Failed Tests

Name Message ⏱️ Duration Suite
BRRDRM1 index_id=6707 Data before RRD rebuild contain index_id % 3. The expected average is 100 if modulo==0, 75 if modulo==1, 0 if modulo==2 . 53.075 s Rrd
BRRDRMU1 Data before RRD rebuild contain index_id % 3. The expected average is 100 if modulo==0, 75 if modulo==1, 0 if modulo==2 . 65.781 s Rrd

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
8 2 0 10 80.00 5m7.004070999s

Failed Tests

Name Message ⏱️ Duration Suite
BRRDRM1 index_id=6397 Data before RRD rebuild contain index_id % 3. The expected average is 100 if modulo==0, 75 if modulo==1, 0 if modulo==2 . 40.113 s Rrd
BRRDRMU1 Data before RRD rebuild contain index_id % 3. The expected average is 100 if modulo==0, 75 if modulo==1, 0 if modulo==2 . 56.441 s Rrd

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
8 2 0 10 80.00 4m59.665189s

Failed Tests

Name Message ⏱️ Duration Suite
BRRDRM1 index_id=6346 Data before RRD rebuild contain index_id % 3. The expected average is 100 if modulo==0, 75 if modulo==1, 0 if modulo==2 . 40.769 s Rrd
BRRDRMU1 Data before RRD rebuild contain index_id % 3. The expected average is 100 if modulo==0, 75 if modulo==1, 0 if modulo==2 . 50.796 s Rrd

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Robot Results

✅ Passed ❌ Failed ⏭️ Skipped Total Pass % ⏱️ Duration
16 4 0 20 80.00 9m46.689059999s

Failed Tests

Name Message ⏱️ Duration Suite
BRRDRM1 index_id=6647 Data before RRD rebuild contain index_id % 3. The expected average is 100 if modulo==0, 75 if modulo==1, 0 if modulo==2 . 43.793 s Rrd
BRRDRMU1 Data before RRD rebuild contain index_id % 3. The expected average is 100 if modulo==0, 75 if modulo==1, 0 if modulo==2 . 50.806 s Rrd
BRRDRM1 index_id=6481 Data before RRD rebuild contain index_id % 3. The expected average is 100 if modulo==0, 75 if modulo==1, 0 if modulo==2 . 41.441 s Rrd
BRRDRMU1 Data before RRD rebuild contain index_id % 3. The expected average is 100 if modulo==0, 75 if modulo==1, 0 if modulo==2 . 50.743 s Rrd

Please sign in to comment.