Skip to content

Commit

Permalink
enh(broker): new grpc module
Browse files Browse the repository at this point in the history
REFS: MON-12283
  • Loading branch information
jean-christophe81 authored and bouda1 committed Apr 21, 2022
1 parent 1c66131 commit 20c9f23
Show file tree
Hide file tree
Showing 56 changed files with 3,616 additions and 99 deletions.
9 changes: 8 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

# Set necessary settings.
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
cmake_minimum_required(VERSION 3.1)
cmake_minimum_required(VERSION 3.16)
project("Centreon Collect" C CXX)
if (NOT CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND NOT CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
message(FATAL_ERROR "You can build broker with g++ or clang++. CMake will exit.")
Expand Down Expand Up @@ -63,6 +63,13 @@ set(USER_ENGINE centreon-engine)
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup(TARGETS)

#find_package(Boost 1.78.0 REQUIRED)

# if (NOT Boost_FOUND)
# message(FATAL_ERROR "boost mandatory")
# endif()


include(GNUInstallDirs)

# var directories.
Expand Down
6 changes: 5 additions & 1 deletion centreon-broker/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ set(LIBROKER_SOURCES
${SRC_DIR}/instance_broadcast.cc
${SRC_DIR}/io/data.cc
${SRC_DIR}/io/endpoint.cc
${SRC_DIR}/io/limit_endpoint.cc
${SRC_DIR}/io/events.cc
${SRC_DIR}/io/factory.cc
${SRC_DIR}/io/protocols.cc
Expand Down Expand Up @@ -393,6 +394,7 @@ set(LIBROKER_SOURCES
${INC_DIR}/instance_broadcast.hh
${INC_DIR}/io/data.hh
${INC_DIR}/io/endpoint.hh
${INC_DIR}/io/limit_endpoint.hh
${INC_DIR}/io/event_info.hh
${INC_DIR}/io/events.hh
${INC_DIR}/io/factory.hh
Expand All @@ -413,6 +415,7 @@ set(LIBROKER_SOURCES
${INC_DIR}/misc/stringifier.hh
${INC_DIR}/misc/time.hh
${INC_DIR}/misc/tokenizer.hh
${INC_DIR}/misc/trash.hh
${INC_DIR}/misc/variant.hh
${INC_DIR}/modules/handle.hh
${INC_DIR}/multiplexing/engine.hh
Expand Down Expand Up @@ -466,7 +469,7 @@ add_executable(cbd ${SRC_DIR}/main.cc)

#Flags needed to include all symbols in binary.
target_link_libraries(cbd
"-Wl,--whole-archive" rokerbase roker "-Wl,--no-whole-archive" CONAN_PKG::nlohmann_json CONAN_PKG::fmt CONAN_PKG::spdlog CONAN_PKG::grpc )
"-export-dynamic" "-Wl,--whole-archive" rokerbase roker "-Wl,--no-whole-archive" CONAN_PKG::nlohmann_json CONAN_PKG::fmt -L${CONAN_LIB_DIRS_GRPC} "-Wl,--whole-archive" grpc++ "-Wl,--no-whole-archive" CONAN_PKG::spdlog CONAN_PKG::grpc )

# Centreon Broker Watchdog
option(WITH_CBWD "Build centreon broker watchdog." ON)
Expand All @@ -492,6 +495,7 @@ add_broker_module(TCP ON)
add_broker_module(TLS ON)
add_broker_module(TLS2 OFF)
add_broker_module(DUMP OFF)
add_broker_module(GRPC ON)

# Lua module.
option(WITH_MODULE_LUA "Build lua module." ON)
Expand Down
49 changes: 49 additions & 0 deletions centreon-broker/core/inc/com/centreon/broker/io/limit_endpoint.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
** Copyright 2022 Centreon
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
**
** For more information : contact@centreon.com
*/

#ifndef CCB_IO_LIMIT_ENDPOINT_HH
#define CCB_IO_LIMIT_ENDPOINT_HH

#include "endpoint.hh"

CCB_BEGIN()

namespace io {

class limit_endpoint : public endpoint {
protected:
/* How many consecutive calls to is_ready() */
mutable int16_t _is_ready_count;
/* The time of the last call to is_ready() */
mutable std::time_t _is_ready_now;

public:
limit_endpoint(bool is_accptr)
: endpoint(is_accptr), _is_ready_count(0), _is_ready_now(0) {}

std::unique_ptr<stream> open() override;
bool is_ready() const override;

virtual std::unique_ptr<stream> create_stream() = 0;
};

} // namespace io

CCB_END()

#endif // !CCB_IO_LIMIT_ENDPOINT_HH
2 changes: 2 additions & 0 deletions centreon-broker/core/inc/com/centreon/broker/log_v2.hh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class log_v2 {
std::shared_ptr<spdlog::logger> _sql_log;
std::shared_ptr<spdlog::logger> _tcp_log;
std::shared_ptr<spdlog::logger> _tls_log;
std::shared_ptr<spdlog::logger> _grpc_log;
std::mutex _load_m;

log_v2();
Expand All @@ -72,6 +73,7 @@ class log_v2 {
static std::shared_ptr<spdlog::logger> sql();
static std::shared_ptr<spdlog::logger> tcp();
static std::shared_ptr<spdlog::logger> tls();
static std::shared_ptr<spdlog::logger> grpc();
static bool contains_logger(const std::string& logger);
static bool contains_level(const std::string& level);
};
Expand Down
3 changes: 3 additions & 0 deletions centreon-broker/core/inc/com/centreon/broker/misc/string.hh
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ fmt::string_view truncate(const T& str, size_t s) {
size_t adjust_size_utf8(const std::string& str, size_t s);
std::string escape(const std::string& str, size_t s);

std::string debug_buf(const char* data, int32_t size, int max_len = 10);


} // namespace string
} // namespace misc

Expand Down
102 changes: 102 additions & 0 deletions centreon-broker/core/inc/com/centreon/broker/misc/trash.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
** Copyright 2022 Centreon
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
**
** For more information : contact@centreon.com
*/

#ifndef CCB_MISC_TRASH_HH
#define CCB_MISC_TRASH_HH

#include <forward_list>
#include <ctime>
#include "com/centreon/broker/namespace.hh"

CCB_BEGIN()

namespace misc {
/**
* @brief this class is a delayed trash where objects are thrown on. Then they are dereferenced after a time passed in parameter
* it s can be used for objects involved in unsafe asynchronous operations where we aren t sure of objects owning by asynchronous layers
*
* TODO: if we used boost, replace forward_list by a multi_index_container
* @tparam T objects to delete
*/
template <class T>
class trash {
public:
using element_pointer = std::shared_ptr<T>;
protected:
struct element_erase_pair{
element_erase_pair() {}
element_erase_pair(const element_pointer & element, time_t time_to_eras)
:elem(element), time_to_erase(time_to_eras) {}

element_pointer elem;
time_t time_to_erase;
};

using trash_content = std::forward_list<element_erase_pair>;

trash_content _content;
std::mutex _protect;

void clean();
public:
void to_trash(const element_pointer & to_throw, time_t time_to_erase);
void refresh_time_to_erase(const element_pointer & to_throw, time_t time_to_erase);
};

/**
* @brief dereference perempted objects
* not mutex protected
* @tparam T
*/
template <class T>
void trash<T>::clean() {
time_t now = time(nullptr);
_content.remove_if([now](const element_erase_pair & toTest) { return toTest.time_to_erase < now;});
}

template <class T>
void trash<T>::to_trash(const element_pointer & to_throw, time_t time_to_erase) {
std::unique_lock<std::mutex> l(_protect);
clean();
for (element_erase_pair toUpdate: _content) {
if (toUpdate.elem == to_throw) {
toUpdate.time_to_erase = time_to_erase;
return;
}
}
_content.emplace_front(to_throw, time_to_erase);
}

template <class T>
void trash<T>::refresh_time_to_erase(const element_pointer & to_update, time_t time_to_erase) {
std::unique_lock<std::mutex> l(_protect);
clean();
for (element_erase_pair toUpdate: _content) {
if (toUpdate.elem == to_update) {
toUpdate.time_to_erase = time_to_erase;
return;
}
}
}


} // namespace misc

CCB_END()

#endif // !CCB_MISC_TOKENIZER_HH
27 changes: 15 additions & 12 deletions centreon-broker/core/src/config/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
#include <streambuf>

#include "com/centreon/broker/log_v2.hh"
#include "com/centreon/broker/misc/string.hh"
#include "com/centreon/broker/misc/filesystem.hh"
#include "com/centreon/broker/misc/string.hh"
#include "com/centreon/exceptions/msg_fmt.hh"

using namespace com::centreon::exceptions;
Expand Down Expand Up @@ -166,17 +166,17 @@ state parser::parse(std::string const& file) {
retval, &state::module_directory,
&json::is_string)) {
if (!misc::filesystem::readable(retval.module_directory()))
throw msg_fmt("The module directory '{}' is not accessible", retval.module_directory());
}
else if (get_conf<state>({it.key(), it.value()}, "cache_directory",
retval, &state::cache_directory,
&json::is_string)) {
throw msg_fmt("The module directory '{}' is not accessible",
retval.module_directory());
} else if (get_conf<state>({it.key(), it.value()}, "cache_directory",
retval, &state::cache_directory,
&json::is_string)) {
if (!misc::filesystem::readable(retval.cache_directory()))
throw msg_fmt("The cache directory '{}' is not accessible", retval.cache_directory());
}
else if (get_conf<int, state>({it.key(), it.value()}, "pool_size",
retval, &state::pool_size,
&json::is_number, &json::get<int>))
throw msg_fmt("The cache directory '{}' is not accessible",
retval.cache_directory());
} else if (get_conf<int, state>({it.key(), it.value()}, "pool_size",
retval, &state::pool_size,
&json::is_number, &json::get<int>))
;
else if (get_conf<state>({it.key(), it.value()}, "command_file", retval,
&state::command_file, &json::is_string))
Expand Down Expand Up @@ -249,7 +249,8 @@ state parser::parse(std::string const& file) {
conf.directory = "/var/log/centreon-broker";

if (!misc::filesystem::writable(conf.directory))
throw msg_fmt("The log directory '{}' is not writable", conf.directory);
throw msg_fmt("The log directory '{}' is not writable",
conf.directory);

conf.filename = "";

Expand Down Expand Up @@ -423,6 +424,8 @@ void parser::_parse_endpoint(json const& elem,
module = "70-graphite.so";
else if (e.type == "influxdb")
module = "70-influxdb.so";
else if (e.type == "grpc")
module = "50-grpc.so";
else
throw msg_fmt("config parser: endpoint of invalid type '{}'", e.type);
}
Expand Down
59 changes: 59 additions & 0 deletions centreon-broker/core/src/io/limit_endpoint.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
** Copyright 2022 Centreon
**
** Licensed under the Apache License, Version 2.0 (the "License");
** you may not use this file except in compliance with the License.
** You may obtain a copy of the License at
**
** http://www.apache.org/licenses/LICENSE-2.0
**
** Unless required by applicable law or agreed to in writing, software
** distributed under the License is distributed on an "AS IS" BASIS,
** WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
** See the License for the specific language governing permissions and
** limitations under the License.
**
** For more information : contact@centreon.com
*/
#include "com/centreon/broker/io/limit_endpoint.hh"


using namespace com::centreon::broker::io;

/**
* @brief Connect to the remote host.
*
* @return The connection object.
*/
std::unique_ptr<stream> limit_endpoint::open() {
// Launch connection process.
try {
std::unique_ptr<stream> retval = create_stream();
_is_ready_count = 0;
return retval;
} catch (const std::exception& e) {
if (_is_ready_count < 30)
_is_ready_count++;
return nullptr;
}
}

/**
* @brief Return true when it is time to attempt a new connection. The idea is
* to increase the duration between two calls each time this function is called
* without connection between. So if now server is available, we should not
* try to connect too often, but if the connection failed one time, it should
* be fast to connect again.
*
* @return a boolean.
*/
bool limit_endpoint::is_ready() const {
time_t now;
std::time(&now);
if (now - _is_ready_now > (1 << _is_ready_count)) {
_is_ready_now = now;
return true;
}
return false;
}

Loading

0 comments on commit 20c9f23

Please sign in to comment.