From 40a0b6c8eed4da81b2ebb6e9d91bbe8c8e68eae9 Mon Sep 17 00:00:00 2001 From: Jean Christophe Roques Date: Tue, 25 Jun 2024 11:14:46 +0200 Subject: [PATCH] telegraf conf accepts only one host add scheduler, check and check_exec class to agent --- .gitignore | 1 + agent/CMakeLists.txt | 6 + agent/doc/agent-doc.md | 3 +- agent/inc/com/centreon/agent/check.hh | 125 +++++ agent/inc/com/centreon/agent/check_exec.hh | 117 +++++ agent/inc/com/centreon/agent/config.hh | 6 +- agent/inc/com/centreon/agent/scheduler.hh | 209 ++++++++ agent/src/check.cc | 141 +++++ agent/src/check_exec.cc | 266 ++++++++++ agent/src/config.cc | 12 +- agent/src/main.cc | 19 +- agent/src/scheduler.cc | 485 ++++++++++++++++++ agent/test/CMakeLists.txt | 4 + agent/test/check_exec_test.cc | 113 ++++ agent/test/check_test.cc | 141 +++++ agent/test/scheduler_test.cc | 450 ++++++++++++++++ .../modules/opentelemetry/conf_helper.hh | 100 ++++ .../opentelemetry/telegraf/conf_server.hh | 2 +- .../opentelemetry/src/data_point_fifo.cc | 4 +- .../opentelemetry/src/telegraf/conf_server.cc | 24 +- tests/broker-engine/opentelemetry.robot | 53 +- 21 files changed, 2223 insertions(+), 58 deletions(-) create mode 100644 agent/inc/com/centreon/agent/check.hh create mode 100644 agent/inc/com/centreon/agent/check_exec.hh create mode 100644 agent/inc/com/centreon/agent/scheduler.hh create mode 100644 agent/src/check.cc create mode 100644 agent/src/check_exec.cc create mode 100644 agent/src/scheduler.cc create mode 100644 agent/test/check_exec_test.cc create mode 100644 agent/test/check_test.cc create mode 100644 agent/test/scheduler_test.cc create mode 100644 engine/modules/opentelemetry/inc/com/centreon/engine/modules/opentelemetry/conf_helper.hh diff --git a/.gitignore b/.gitignore index 77cd000405a..e783d4cbb85 100644 --- a/.gitignore +++ b/.gitignore @@ -67,6 +67,7 @@ report.html # agent agent/scripts/centagent.service +agent/conf/centagent.json opentelemetry-proto # bbdo diff --git a/agent/CMakeLists.txt b/agent/CMakeLists.txt index 4d37d055ee2..7a8ec1a1036 100644 --- a/agent/CMakeLists.txt +++ b/agent/CMakeLists.txt @@ -99,12 +99,17 @@ add_custom_command( add_library(centagent_lib STATIC + ${SRC_DIR}/agent.grpc.pb.cc + ${SRC_DIR}/agent.pb.cc + ${SRC_DIR}/check.cc + ${SRC_DIR}/check_exec.cc ${SRC_DIR}/opentelemetry/proto/collector/metrics/v1/metrics_service.grpc.pb.cc ${SRC_DIR}/opentelemetry/proto/collector/metrics/v1/metrics_service.pb.cc ${SRC_DIR}/opentelemetry/proto/metrics/v1/metrics.pb.cc ${SRC_DIR}/opentelemetry/proto/common/v1/common.pb.cc ${SRC_DIR}/opentelemetry/proto/resource/v1/resource.pb.cc ${SRC_DIR}/config.cc + ${SRC_DIR}/scheduler.cc ) include_directories( @@ -127,6 +132,7 @@ target_link_libraries( centagent_lib centreon_common centreon_grpc + centreon_process -L${Boost_LIBRARY_DIR_RELEASE} boost_program_options fmt::fmt) diff --git a/agent/doc/agent-doc.md b/agent/doc/agent-doc.md index ab326f5fbdc..9793f411f5b 100644 --- a/agent/doc/agent-doc.md +++ b/agent/doc/agent-doc.md @@ -19,5 +19,6 @@ The first service check will start right now, the second one at 12:00:06, third We don't care about the duration of tests, we work with time points. In the previous example, the second check for the first service will be scheduled at 12:00:10 even if all other checks has not been yet started. -In case of check duration is too long, we might exceed maximum of concurrent checks. In that case checks will be executed as soon one will be ended. +In case of check duration is too long, we might exceed maximum of concurrent checks. In that case checks will be executed as soon one will be ended. But next calls will occur at check start + check_period. This means that the second check may start later than the scheduled time point (12:00:10) if the other first checks are too long. The order of checks is always respected even in case of a bottleneck. +For example, a check lambda has a start_expected to 12:00, because of bottleneck, it starts at 12:15. Next start_expected of check lambda will then be 12:15 + check_period. diff --git a/agent/inc/com/centreon/agent/check.hh b/agent/inc/com/centreon/agent/check.hh new file mode 100644 index 00000000000..c2808293e0e --- /dev/null +++ b/agent/inc/com/centreon/agent/check.hh @@ -0,0 +1,125 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#ifndef CENTREON_AGENT_CHECK_HH +#define CENTREON_AGENT_CHECK_HH + +#include "agent.pb.h" +#include "com/centreon/common/perfdata.hh" + +namespace com::centreon::agent { + +using engine_to_agent_request_ptr = + std::shared_ptr; + +using time_point = std::chrono::system_clock::time_point; +using duration = std::chrono::system_clock::duration; + +/** + * @brief base class for check + * start_expected is set by scheduler and increased by check_period on each + * check + * + */ +class check : public std::enable_shared_from_this { + public: + using completion_handler = std::function& caller, + int status, + const std::list& perfdata, + const std::list& outputs)>; + + private: + //_start_expected is set on construction on config receive + // it's updated on check_start and added of check_period on check completion + time_point _start_expected; + const std::string& _service; + const std::string& _command_name; + const std::string& _command_line; + // by owning a reference to the original request, we can get only reference to + // host, service and command_line + // on completion, this pointer is compared to the current config pointer. + // if not equal result is not processed + engine_to_agent_request_ptr _conf; + + asio::system_timer _time_out_timer; + + void _start_timeout_timer(const duration& timeout); + + bool _running_check = false; + // this index is used and incremented by on_completion to insure that + // async on_completion is called by the actual asynchronous check + unsigned _running_check_index = 0; + completion_handler _completion_handler; + + protected: + std::shared_ptr _io_context; + std::shared_ptr _logger; + + unsigned _get_running_check_index() const { return _running_check_index; } + const completion_handler& _get_completion_handler() const { + return _completion_handler; + } + + virtual void _timeout_timer_handler(const boost::system::error_code& err, + unsigned start_check_index); + + public: + using pointer = std::shared_ptr; + + check(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point exp, + const std::string& serv, + const std::string& command_name, + const std::string& cmd_line, + const engine_to_agent_request_ptr& cnf, + completion_handler&& handler); + + virtual ~check() = default; + + struct pointer_start_compare { + bool operator()(const check::pointer& left, + const check::pointer& right) const { + return left->_start_expected < right->_start_expected; + } + }; + + void add_duration_to_start_expected(const duration& to_add); + + time_point get_start_expected() const { return _start_expected; } + + const std::string& get_service() const { return _service; } + + const std::string& get_command_name() const { return _command_name; } + + const std::string& get_command_line() const { return _command_line; } + + const engine_to_agent_request_ptr& get_conf() const { return _conf; } + + void on_completion(unsigned start_check_index, + unsigned status, + const std::list& perfdata, + const std::list& outputs); + + virtual void start_check(const duration& timeout); +}; + +} // namespace com::centreon::agent + +#endif diff --git a/agent/inc/com/centreon/agent/check_exec.hh b/agent/inc/com/centreon/agent/check_exec.hh new file mode 100644 index 00000000000..42107040c4a --- /dev/null +++ b/agent/inc/com/centreon/agent/check_exec.hh @@ -0,0 +1,117 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#ifndef CENTREON_AGENT_CHECK_EXEC_HH +#define CENTREON_AGENT_CHECK_EXEC_HH + +#include "check.hh" +#include "com/centreon/common/process.hh" + +namespace com::centreon::agent { + +class check_exec; + +namespace detail { + +/** + * @brief This class is used by check_exec class to execute plugins + * It calls check_exec::on_completion once process is ended AND we have received + * an eof on stdout pipe + * stderr pipe is not read as plugins should not use it + * As we are in asynchronous world, running index is carried until completion to + * ensure that completion is called for the right process and not for the + * previous one + */ +class process : public common::process { + bool _process_ended; + bool _stdout_eof; + std::string _stdout; + unsigned _running_index; + std::weak_ptr _parent; + + void _on_completion(); + + public: + process(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string& cmd_line, + const std::shared_ptr& parent); + + void start(unsigned running_index); + + void kill() { common::process::kill(); } + + int get_exit_status() const { return common::process::get_exit_status(); } + + const std::string& get_stdout() const { return _stdout; } + + protected: + void on_stdout_read(const boost::system::error_code& err, + size_t nb_read) override; + void on_stderr_read(const boost::system::error_code& err, + size_t nb_read) override; + + void on_process_end(const boost::system::error_code& err, + int raw_exit_status) override; +}; + +} // namespace detail + +/** + * @brief check that executes a process (plugins) + * + */ +class check_exec : public check { + std::shared_ptr _process; + + protected: + using check::completion_handler; + + void _timeout_timer_handler(const boost::system::error_code& err, + unsigned start_check_index) override; + + void _init(); + + public: + check_exec(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point exp, + const std::string& serv, + const std::string& cmd_name, + const std::string& cmd_line, + const engine_to_agent_request_ptr& cnf, + check::completion_handler&& handler); + + static std::shared_ptr load( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point exp, + const std::string& serv, + const std::string& cmd_name, + const std::string& cmd_line, + const engine_to_agent_request_ptr& cnf, + check::completion_handler&& handler); + + void start_check(const duration& timeout) override; + + void on_completion(unsigned running_index); +}; + +} // namespace com::centreon::agent + +#endif diff --git a/agent/inc/com/centreon/agent/config.hh b/agent/inc/com/centreon/agent/config.hh index 5df87e87901..d0bd774f97a 100644 --- a/agent/inc/com/centreon/agent/config.hh +++ b/agent/inc/com/centreon/agent/config.hh @@ -26,6 +26,8 @@ class config { public: enum log_type { to_stdout, to_file }; + static const std::string_view config_schema; + private: std::string _endpoint; spdlog::level::level_enum _log_level; @@ -35,7 +37,7 @@ class config { unsigned _log_files_max_number; bool _encryption; - std::string _certificate_file; + std::string _public_cert_file; std::string _private_key_file; std::string _ca_certificate_file; std::string _ca_name; @@ -53,7 +55,7 @@ class config { unsigned get_log_files_max_number() const { return _log_files_max_number; } bool use_encryption() const { return _encryption; } - const std::string& get_certificate_file() const { return _certificate_file; } + const std::string& get_public_cert_file() const { return _public_cert_file; } const std::string& get_private_key_file() const { return _private_key_file; } const std::string& get_ca_certificate_file() const { return _ca_certificate_file; diff --git a/agent/inc/com/centreon/agent/scheduler.hh b/agent/inc/com/centreon/agent/scheduler.hh new file mode 100644 index 00000000000..fcd3d71a6fa --- /dev/null +++ b/agent/inc/com/centreon/agent/scheduler.hh @@ -0,0 +1,209 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#ifndef CENTREON_AGENT_SCHEDULER_HH +#define CENTREON_AGENT_SCHEDULER_HH + +#include "check.hh" + +namespace com::centreon::agent { + +/** + * @brief the core of the agent + * It has to create check object with chck_builder passed in parameter of load + * method It sends metrics to engine and tries to spread checks over check + * period It also limits concurrent checks in order to limit system load + */ +class scheduler : public std::enable_shared_from_this { + public: + using metric_sender = + std::function&)>; + using check_builder = std::function( + const std::shared_ptr&, + const std::shared_ptr& /*logger*/, + time_point /* start expected*/, + const std::string& /*service*/, + const std::string& /*cmd_name*/, + const std::string& /*cmd_line*/, + const engine_to_agent_request_ptr& /*engine to agent request*/, + check::completion_handler&&)>; + + private: + using check_queue = std::set; + + check_queue _check_queue; + // running check counter that must not exceed max_concurrent_check + unsigned _active_check = 0; + bool _alive = true; + + // request that will be sent to engine + std::shared_ptr _current_request; + + // pointers in this struct point to _current_request + struct scope_metric_request { + ::opentelemetry::proto::metrics::v1::ScopeMetrics* scope_metric; + absl::flat_hash_map + metrics; + }; + + // one serv => one scope_metric => several metrics + absl::flat_hash_map _serv_to_scope_metrics; + + std::shared_ptr _io_context; + std::shared_ptr _logger; + // host declared in engine config + std::string _supervised_host; + metric_sender _metric_sender; + asio::system_timer _send_timer; + asio::system_timer _check_timer; + check_builder _check_builder; + // in order to send check_results at regular intervals, we work with absolute + // time points that we increment + time_point _next_send_time_point; + // last received configuration + engine_to_agent_request_ptr _conf; + + void _start(); + void _start_send_timer(); + void _send_timer_handler(const boost::system::error_code& err); + void _start_check_timer(); + void _check_timer_handler(const boost::system::error_code& err); + + void _init_export_request(); + void _start_check(const check::pointer& check); + void _check_handler( + const check::pointer& check, + unsigned status, + const std::list& perfdata, + const std::list& outputs); + void _store_result_in_metrics( + const check::pointer& check, + unsigned status, + const std::list& perfdata, + const std::list& outputs); + void _store_result_in_metrics_and_exemplars( + const check::pointer& check, + unsigned status, + const std::list& perfdata, + const std::list& outputs); + + scope_metric_request& _get_scope_metrics(const std::string& service); + + ::opentelemetry::proto::metrics::v1::Metric* _get_metric( + scope_metric_request& scope_metric, + const std::string& metric_name); + + void _add_metric_to_scope(uint64_t now, + const com::centreon::common::perfdata& perf, + scope_metric_request& scope_metric); + + void _add_exemplar( + const char* label, + double value, + ::opentelemetry::proto::metrics::v1::NumberDataPoint& data_point); + void _add_exemplar( + const char* label, + bool value, + ::opentelemetry::proto::metrics::v1::NumberDataPoint& data_point); + + void _start_waiting_check(); + + public: + template + scheduler(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string& supervised_host, + const std::shared_ptr& config, + sender&& met_sender, + chck_builder&& builder); + + scheduler(const scheduler&) = delete; + scheduler operator=(const scheduler&) = delete; + + void update(const engine_to_agent_request_ptr& conf); + + static std::shared_ptr default_config(); + + template + static std::shared_ptr load( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string& supervised_host, + const std::shared_ptr& config, + sender&& met_sender, + chck_builder&& chk_builder); + + void stop(); + + engine_to_agent_request_ptr get_last_message_to_agent() const { + return _conf; + } +}; + +/** + * @brief Construct a new scheduler::scheduler object + * + * @tparam sender + * @param met_sender void(const export_metric_request_ptr&) called each time + * scheduler wants to send metrics to engine + * @param io_context + */ +template +scheduler::scheduler( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string& supervised_host, + const std::shared_ptr& config, + sender&& met_sender, + chck_builder&& builder) + : _metric_sender(met_sender), + _io_context(io_context), + _logger(logger), + _supervised_host(supervised_host), + _send_timer(*io_context), + _check_timer(*io_context), + _check_builder(builder), + _conf(config) {} + +/** + * @brief create and start a new scheduler + * + * @tparam sender + * @param met_sender void(const export_metric_request_ptr&) called each time + * scheduler wants to send metrics to engine + * @return std::shared_ptr + */ +template +std::shared_ptr scheduler::load( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string& supervised_host, + const std::shared_ptr& config, + sender&& met_sender, + chck_builder&& chk_builder) { + std::shared_ptr to_start = std::make_shared( + io_context, logger, supervised_host, config, std::move(met_sender), + std::move(chk_builder)); + to_start->_start(); + return to_start; +} + +} // namespace com::centreon::agent + +#endif diff --git a/agent/src/check.cc b/agent/src/check.cc new file mode 100644 index 00000000000..562fd0329b2 --- /dev/null +++ b/agent/src/check.cc @@ -0,0 +1,141 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#include "check.hh" + +using namespace com::centreon::agent; + +/** + * @brief Construct a new check::check object + * + * @param io_context + * @param logger + * @param exp + * @param serv + * @param command_name + * @param cmd_line + * @param cnf + * @param handler + */ +check::check(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point exp, + const std::string& serv, + const std::string& command_name, + const std::string& cmd_line, + const engine_to_agent_request_ptr& cnf, + completion_handler&& handler) + : _start_expected(exp), + _service(serv), + _command_name(command_name), + _command_line(cmd_line), + _conf(cnf), + _io_context(io_context), + _logger(logger), + _time_out_timer(*io_context), + _completion_handler(handler) {} + +/** + * @brief scheduler uses this method to increase start_expected + * + * @param to_add + */ +void check::add_duration_to_start_expected(const duration& to_add) { + _start_expected += to_add; +} + +/** + * @brief start a asynchronous check + * + * @param timeout + */ +void check::start_check(const duration& timeout) { + if (_running_check) { + SPDLOG_LOGGER_ERROR(_logger, "check for service {} is already running", + _service); + _io_context->post( + [me = shared_from_this(), to_call = _completion_handler]() { + to_call(me, 3, std::list(), + {"a check is already running"}); + }); + return; + } + // we refresh start expected in order that next call will occur at now + check + // period + _start_expected = std::chrono::system_clock::now(); + _running_check = true; + _start_timeout_timer(timeout); + SPDLOG_LOGGER_TRACE(_logger, "start check for service {}", _service); +} + +/** + * @brief start check timeout timer + * + * @param timeout + */ +void check::_start_timeout_timer(const duration& timeout) { + _time_out_timer.expires_from_now(timeout); + _time_out_timer.async_wait( + [me = shared_from_this(), start_check_index = _running_check_index]( + const boost::system::error_code& err) { + me->_timeout_timer_handler(err, start_check_index); + }); +} + +/** + * @brief timeout timer handler + * + * @param err + * @param start_check_index + */ +void check::_timeout_timer_handler(const boost::system::error_code& err, + unsigned start_check_index) { + if (err) { + return; + } + if (start_check_index == _running_check_index) { + SPDLOG_LOGGER_ERROR(_logger, "check timeout for service {}", _service); + on_completion(start_check_index, 3 /*unknown*/, + std::list(), + {"Timeout at execution of " + _command_line}); + } +} + +/** + * @brief called when check is ended + * _running_check is increased so that next check will be identified by this new + * id. We also cancel timeout timer + * + * @param start_check_index + * @param status + * @param perfdata + * @param outputs + */ +void check::on_completion( + unsigned start_check_index, + unsigned status, + const std::list& perfdata, + const std::list& outputs) { + if (start_check_index == _running_check_index) { + SPDLOG_LOGGER_TRACE(_logger, "end check for service {}", _service); + _time_out_timer.cancel(); + _running_check = false; + ++_running_check_index; + _completion_handler(shared_from_this(), status, perfdata, outputs); + } +} diff --git a/agent/src/check_exec.cc b/agent/src/check_exec.cc new file mode 100644 index 00000000000..d38d0deeac9 --- /dev/null +++ b/agent/src/check_exec.cc @@ -0,0 +1,266 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#include "check_exec.hh" + +using namespace com::centreon::agent; + +/** + * @brief Construct a new detail::process::process object + * + * @param io_context + * @param logger + * @param cmd_line + * @param parent + */ +detail::process::process(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + const std::string& cmd_line, + const std::shared_ptr& parent) + : common::process(io_context, logger, cmd_line), _parent(parent) {} + +/** + * @brief start a new process, if a previous one is already running, it's killed + * + * @param running_index + */ +void detail::process::start(unsigned running_index) { + _process_ended = false; + _stdout_eof = false; + _running_index = running_index; + _stdout.clear(); + common::process::start_process(); +} + +/** + * @brief son process stdout read handler + * + * @param err + * @param nb_read + */ +void detail::process::on_stdout_read(const boost::system::error_code& err, + size_t nb_read) { + if (!err && nb_read > 0) { + _stdout.append(_stdout_read_buffer, nb_read); + } else if (err == asio::error::eof) { + _stdout_eof = true; + _on_completion(); + } + common::process::on_stdout_read(err, nb_read); +} + +/** + * @brief son process stderr read handler + * + * @param err + * @param nb_read + */ +void detail::process::on_stderr_read(const boost::system::error_code& err, + size_t nb_read) { + if (!err) { + SPDLOG_LOGGER_ERROR(_logger, "process error: {}", + std::string_view(_stderr_read_buffer, nb_read)); + } + common::process::on_stderr_read(err, nb_read); +} + +/** + * @brief called when son process ends + * + * @param err + * @param raw_exit_status + */ +void detail::process::on_process_end(const boost::system::error_code& err, + int raw_exit_status) { + if (err) { + _stdout += fmt::format("fail to execute process {} : {}", get_exe_path(), + err.message()); + } + common::process::on_process_end(err, raw_exit_status); + _process_ended = true; + _on_completion(); +} + +/** + * @brief if both stdout read and process are terminated, we call + * check_exec::on_completion + * + */ +void detail::process::_on_completion() { + if (_stdout_eof && _process_ended) { + std::shared_ptr parent = _parent.lock(); + if (parent) { + parent->on_completion(_running_index); + } + } +} + +/****************************************************************** + * check_exec + ******************************************************************/ + +check_exec::check_exec(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point exp, + const std::string& serv, + const std::string& cmd_name, + const std::string& cmd_line, + const engine_to_agent_request_ptr& cnf, + check::completion_handler&& handler) + : check(io_context, + logger, + exp, + serv, + cmd_name, + cmd_line, + cnf, + std::move(handler)) {} + +/** + * @brief create and initialize a check_exec object (don't use constructor) + * + * @tparam handler_type + * @param io_context + * @param logger + * @param exp start expected + * @param serv + * @param cmd_name + * @param cmd_line + * @param cnf agent configuration + * @param handler completion handler + * @return std::shared_ptr + */ +std::shared_ptr check_exec::load( + const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point exp, + const std::string& serv, + const std::string& cmd_name, + const std::string& cmd_line, + const engine_to_agent_request_ptr& cnf, + check::completion_handler&& handler) { + std::shared_ptr ret = + std::make_shared(io_context, logger, exp, serv, cmd_name, + cmd_line, cnf, std::move(handler)); + ret->_init(); + return ret; +} + +/** + * @brief to call after construction + * constructor mustn't be called, use check_exec::load instead + * + */ +void check_exec::_init() { + try { + _process = std::make_shared( + _io_context, _logger, get_command_line(), + std::static_pointer_cast(shared_from_this())); + } catch (const std::exception& e) { + SPDLOG_LOGGER_ERROR(_logger, "fail to create process of cmd_line '{}' : {}", + get_command_line(), e.what()); + } +} + +/** + * @brief start a check, completion handler is always called asynchronously even + * in case of failure + * + * @param timeout + */ +void check_exec::start_check(const duration& timeout) { + check::start_check(timeout); + if (!_process) { + _io_context->post([me = check::shared_from_this(), + start_check_index = _get_running_check_index()]() { + me->on_completion(start_check_index, 3, + std::list(), + {"empty command"}); + }); + } + + try { + _process->start(_get_running_check_index()); + } catch (const boost::system::system_error& e) { + SPDLOG_LOGGER_ERROR(_logger, " serv {} fail to execute {}: {}", + get_service(), get_command_line(), e.code().message()); + _io_context->post([me = check::shared_from_this(), + start_check_index = _get_running_check_index(), e]() { + me->on_completion( + start_check_index, 3, std::list(), + {fmt::format("Fail to execute {} : {}", me->get_command_line(), + e.code().message())}); + }); + } catch (const std::exception& e) { + SPDLOG_LOGGER_ERROR(_logger, " serv {} fail to execute {}: {}", + get_service(), get_command_line(), e.what()); + _io_context->post([me = check::shared_from_this(), + start_check_index = _get_running_check_index(), e]() { + me->on_completion(start_check_index, 3, + std::list(), + {fmt::format("Fail to execute {} : {}", + me->get_command_line(), e.what())}); + }); + } +} + +/** + * @brief process is killed in case of timeout and handler is called + * + * @param err + * @param start_check_index + */ +void check_exec::_timeout_timer_handler(const boost::system::error_code& err, + unsigned start_check_index) { + if (err) { + return; + } + if (start_check_index == _get_running_check_index()) { + check::_timeout_timer_handler(err, start_check_index); + _process->kill(); + } +} + +/** + * @brief called on process completion + * + * @param running_index + */ +void check_exec::on_completion(unsigned running_index) { + if (running_index != _get_running_check_index()) { + return; + } + + std::list outputs; + std::list perfs; + + // split multi line output + outputs = absl::StrSplit(_process->get_stdout(), '\n', absl::SkipEmpty()); + if (!outputs.empty()) { + const std::string& first_line = *outputs.begin(); + size_t pipe_pos = first_line.find('|'); + if (pipe_pos != std::string::npos) { + std::string perfdatas = outputs.begin()->substr(pipe_pos + 1); + boost::trim(perfdatas); + perfs = com::centreon::common::perfdata::parse_perfdata( + 0, 0, perfdatas.c_str(), _logger); + } + } + check::on_completion(running_index, _process->get_exit_status(), perfs, + outputs); +} diff --git a/agent/src/config.cc b/agent/src/config.cc index b47681037b9..cd46ce23742 100644 --- a/agent/src/config.cc +++ b/agent/src/config.cc @@ -26,7 +26,7 @@ using namespace com::centreon::agent; using com::centreon::common::rapidjson_helper; -static constexpr std::string_view _config_schema(R"( +const std::string_view config::config_schema(R"( { "$schema": "http://json-schema.org/draft-04/schema#", "title": "agent config", @@ -100,7 +100,7 @@ static constexpr std::string_view _config_schema(R"( )"); config::config(const std::string& path) { - static common::json_validator validator(_config_schema); + static common::json_validator validator(config_schema); rapidjson::Document file_content_d; try { file_content_d = rapidjson_helper::read_from_file(path); @@ -136,13 +136,13 @@ config::config(const std::string& path) { _log_files_max_size = json_config.get_unsigned("log_files_max_size", 0); _log_files_max_number = json_config.get_unsigned("log_files_max_number", 0); _encryption = json_config.get_bool("encryption", false); - _certificate_file = json_config.get_string("certificate_file", ""); - _private_key_file = json_config.get_string("private_key_file", ""); - _ca_certificate_file = json_config.get_string("ca_certificate_file", ""); + _public_cert_file = json_config.get_string("public_cert", ""); + _private_key_file = json_config.get_string("private_key", ""); + _ca_certificate_file = json_config.get_string("ca_certificate", ""); _ca_name = json_config.get_string("ca_name", ""); _host = json_config.get_string("host", ""); if (_host.empty()) { _host = boost::asio::ip::host_name(); } _reverse_connection = json_config.get_bool("reverse_connection", false); -} \ No newline at end of file +} diff --git a/agent/src/main.cc b/agent/src/main.cc index d78e1dc80ba..562a1f05e46 100644 --- a/agent/src/main.cc +++ b/agent/src/main.cc @@ -71,8 +71,7 @@ static std::string read_file(const std::string& file_path) { return ss.str(); } } catch (const std::exception& e) { - SPDLOG_LOGGER_ERROR(g_logger, "{} fail to read {}: {}", file_path, - e.what()); + SPDLOG_LOGGER_ERROR(g_logger, "fail to read {}: {}", file_path, e.what()); } return ""; } @@ -85,6 +84,14 @@ int main(int argc, char* argv[]) { return 1; } + if (!strcmp(argv[1], "--help")) { + SPDLOG_INFO( + "Usage: {} \nSchema of the config " + "file is:\n{}", + argv[0], config::config_schema); + return 1; + } + std::unique_ptr conf; try { conf = std::make_unique(argv[1]); @@ -131,6 +138,10 @@ int main(int argc, char* argv[]) { g_logger->set_level(conf->get_log_level()); + g_logger->flush_on(spdlog::level::warn); + + spdlog::flush_every(std::chrono::seconds(1)); + SPDLOG_LOGGER_INFO(g_logger, "centreon-monitoring-agent start, you can decrease log " "verbosity by kill -USR1 {} or increase by kill -USR2 {}", @@ -145,7 +156,7 @@ int main(int argc, char* argv[]) { grpc_conf = std::make_shared( conf->get_endpoint(), conf->use_encryption(), - read_file(conf->get_certificate_file()), + read_file(conf->get_public_cert_file()), read_file(conf->get_private_key_file()), read_file(conf->get_ca_certificate_file()), conf->get_ca_name(), true, 30); @@ -165,4 +176,4 @@ int main(int argc, char* argv[]) { SPDLOG_LOGGER_INFO(g_logger, "centreon-monitoring-agent end"); return 0; -} \ No newline at end of file +} diff --git a/agent/src/scheduler.cc b/agent/src/scheduler.cc new file mode 100644 index 00000000000..207ef35721e --- /dev/null +++ b/agent/src/scheduler.cc @@ -0,0 +1,485 @@ +/** + * Copyright 2024 Centreon + * + * This file is part of Centreon Agent. + * + * Centreon Engine is free software: you can redistribute it and/or + * modify it under the terms of the GNU General Public License version 2 + * as published by the Free Software Foundation. + * + * Centreon Engine is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Centreon Engine. If not, see + * . + */ + +#include "scheduler.hh" + +using namespace com::centreon::agent; + +/** + * @brief to call after creation + * it create a default configuration with no check and start send timer + */ +void scheduler::_start() { + _init_export_request(); + _next_send_time_point = std::chrono::system_clock::now(); + update(_conf); + _start_send_timer(); + _start_check_timer(); +} + +/** + * @brief start periodic metric sent to engine + * + */ +void scheduler::_start_send_timer() { + _next_send_time_point += + std::chrono::seconds(_conf->config().export_period()); + _send_timer.expires_at(_next_send_time_point); + _send_timer.async_wait( + [me = shared_from_this()](const boost::system::error_code& err) { + me->_send_timer_handler(err); + }); +} + +/** + * @brief send all check results to engine + * + * @param err + */ +void scheduler::_send_timer_handler(const boost::system::error_code& err) { + if (err) { + return; + } + if (_current_request->mutable_otel_request()->resource_metrics_size() > 0) { + _metric_sender(_current_request); + _init_export_request(); + } + _start_send_timer(); +} + +/** + * @brief create export request and fill some attributes + * + */ +void scheduler::_init_export_request() { + _current_request = std::make_shared(); + _serv_to_scope_metrics.clear(); +} + +/** + * @brief create a default empty configuration to scheduler + * + */ +std::shared_ptr +scheduler::default_config() { + std::shared_ptr ret = + std::make_shared(); + ret->mutable_config()->set_check_interval(1); + ret->mutable_config()->set_export_period(1); + ret->mutable_config()->set_max_concurrent_checks(10); + return ret; +} + +/** + * @brief start check timer. + * When it will expire, we will call every check whose start_expected is lower + * than the actual time point + * if no check available, we start timer for 100ms + * + */ +void scheduler::_start_check_timer() { + if (_check_queue.empty() || + _active_check >= _conf->config().max_concurrent_checks()) { + _check_timer.expires_from_now(std::chrono::milliseconds(100)); + } else { + _check_timer.expires_at((*_check_queue.begin())->get_start_expected()); + } + _check_timer.async_wait( + [me = shared_from_this()](const boost::system::error_code& err) { + me->_check_timer_handler(err); + }); +} + +/** + * @brief check timer handler + * + * @param err + */ +void scheduler::_check_timer_handler(const boost::system::error_code& err) { + if (err) { + return; + } + _start_waiting_check(); + _start_check_timer(); +} + +/** + * @brief start all waiting checks, no more concurrent checks than + * max_concurrent_checks + * check started are removed from queue and will be inserted once completed + */ +void scheduler::_start_waiting_check() { + time_point now = std::chrono::system_clock::now(); + if (!_check_queue.empty()) { + for (check_queue::iterator to_check = _check_queue.begin(); + !_check_queue.empty() && to_check != _check_queue.end() && + (*to_check)->get_start_expected() <= now && + _active_check < _conf->config().max_concurrent_checks();) { + _start_check(*to_check); + to_check = _check_queue.erase(to_check); + } + } +} + +/** + * @brief called when we receive a new configuration + * It initialize check queue and restart all checks schedule + * running checks stay alive but their completion will not be handled + * We compute start_expected of checks in order to spread checks over + * check_interval + * @param conf + */ +void scheduler::update(const engine_to_agent_request_ptr& conf) { + _check_queue.clear(); + _active_check = 0; + size_t nb_check = conf->config().services().size(); + + if (conf->config().check_interval() <= 0) { + SPDLOG_LOGGER_ERROR( + _logger, "check_interval cannot be null => no configuration update"); + return; + } + + SPDLOG_LOGGER_INFO(_logger, "schedule {} checks to execute in {}s", nb_check, + conf->config().check_interval()); + + if (nb_check > 0) { + duration check_interval = + std::chrono::microseconds(conf->config().check_interval() * 1000000) / + nb_check; + + time_point next = std::chrono::system_clock::now(); + for (const auto& serv : conf->config().services()) { + if (_logger->level() == spdlog::level::trace) { + SPDLOG_LOGGER_TRACE( + _logger, "check expected to start at {} for service {} command {}", + next, serv.service_description(), serv.command_line()); + } else { + SPDLOG_LOGGER_TRACE(_logger, + "check expected to start at {} for service {}", + next, serv.service_description()); + } + _check_queue.emplace(_check_builder( + _io_context, _logger, next, serv.service_description(), + serv.command_name(), serv.command_line(), conf, + [me = shared_from_this()]( + const std::shared_ptr& check, unsigned status, + const std::list& perfdata, + const std::list& outputs) { + me->_check_handler(check, status, perfdata, outputs); + })); + next += check_interval; + } + } + + _conf = conf; +} + +/** + * @brief start a check + * + * @param check + */ +void scheduler::_start_check(const check::pointer& check) { + ++_active_check; + if (_logger->level() <= spdlog::level::trace) { + SPDLOG_LOGGER_TRACE(_logger, "start check for service {} command {}", + check->get_service(), check->get_command_line()); + } else { + SPDLOG_LOGGER_DEBUG(_logger, "start check for service {}", + check->get_service()); + } + check->start_check(std::chrono::seconds(_conf->config().check_timeout())); +} + +/** + * @brief completion check handler + * if conf has been updated during check, it does nothing + * + * @param check + * @param status + * @param perfdata + * @param outputs + */ +void scheduler::_check_handler( + const check::pointer& check, + unsigned status, + const std::list& perfdata, + const std::list& outputs) { + SPDLOG_LOGGER_TRACE(_logger, "end check for service {} command {}", + check->get_service(), check->get_command_line()); + + // conf has changed => no repush for next check + if (check->get_conf() != _conf) { + return; + } + + if (_conf->config().use_exemplar()) { + _store_result_in_metrics_and_exemplars(check, status, perfdata, outputs); + } else { + _store_result_in_metrics(check, status, perfdata, outputs); + } + + --_active_check; + + if (_alive) { + // repush for next check + check->add_duration_to_start_expected( + std::chrono::seconds(_conf->config().check_interval())); + + _check_queue.insert(check); + // we have decreased _active_check, so we can launch another check + _start_waiting_check(); + } +} + +/** + * @brief to call on process termination or accepted connection error + * + */ +void scheduler::stop() { + if (_alive) { + _alive = false; + _send_timer.cancel(); + _check_timer.cancel(); + } +} + +/** + * @brief stores results in telegraf manner + * + * @param check + * @param status + * @param perfdata + * @param outputs + */ +void scheduler::_store_result_in_metrics( + const check::pointer& check, + unsigned status, + const std::list& perfdata, + const std::list& outputs) { + // auto scope_metrics = + // get_scope_metrics(check->get_host(), check->get_service()); + // unsigned now = std::chrono::duration_cast( + // std::chrono::system_clock::now().time_since_epoch()) + // .count(); + + // auto state_metrics = scope_metrics->add_metrics(); + // state_metrics->set_name(check->get_command_name() + "_state"); + // if (!outputs.empty()) { + // const std::string& first_line = *outputs.begin(); + // size_t pipe_pos = first_line.find('|'); + // state_metrics->set_description(pipe_pos != std::string::npos + // ? first_line.substr(0, pipe_pos) + // : first_line); + // } + // auto data_point = state_metrics->mutable_gauge()->add_data_points(); + // data_point->set_time_unix_nano(now); + // data_point->set_as_int(status); + + // we aggregate perfdata results by type (min, max, ) +} + +/** + * @brief store results with centreon sauce + * + * @param check + * @param status + * @param perfdata + * @param outputs + */ +void scheduler::_store_result_in_metrics_and_exemplars( + const check::pointer& check, + unsigned status, + const std::list& perfdata, + const std::list& outputs) { + auto& scope_metrics = _get_scope_metrics(check->get_service()); + uint64_t now = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + auto state_metrics = _get_metric(scope_metrics, "status"); + if (!outputs.empty()) { + const std::string& first_line = *outputs.begin(); + size_t pipe_pos = first_line.find('|'); + state_metrics->set_description(pipe_pos != std::string::npos + ? first_line.substr(0, pipe_pos) + : first_line); + } + auto data_point = state_metrics->mutable_gauge()->add_data_points(); + data_point->set_time_unix_nano(now); + data_point->set_as_int(status); + + for (const com::centreon::common::perfdata& perf : perfdata) { + _add_metric_to_scope(now, perf, scope_metrics); + } +} + +/** + * @brief metrics are grouped by host service + * (one resource_metrics by host serv pair) + * + * @param service + * @return scheduler::scope_metric_request& + */ +scheduler::scope_metric_request& scheduler::_get_scope_metrics( + const std::string& service) { + auto exist = _serv_to_scope_metrics.find(service); + if (exist != _serv_to_scope_metrics.end()) { + return exist->second; + } + ::opentelemetry::proto::metrics::v1::ResourceMetrics* new_res = + _current_request->mutable_otel_request()->add_resource_metrics(); + + auto* host_attrib = new_res->mutable_resource()->add_attributes(); + host_attrib->set_key("host.name"); + host_attrib->mutable_value()->set_string_value(_supervised_host); + auto* serv_attrib = new_res->mutable_resource()->add_attributes(); + serv_attrib->set_key("service.name"); + serv_attrib->mutable_value()->set_string_value(service); + + ::opentelemetry::proto::metrics::v1::ScopeMetrics* new_scope = + new_res->add_scope_metrics(); + + scope_metric_request to_insert; + to_insert.scope_metric = new_scope; + + return _serv_to_scope_metrics.emplace(service, to_insert).first->second; +} + +/** + * @brief one metric by metric name (can contains several datapoints in case of + * multiple checks during send period ) + * + * @param scope_metric + * @param metric_name + * @return ::opentelemetry::proto::metrics::v1::Metric* + */ +::opentelemetry::proto::metrics::v1::Metric* scheduler::_get_metric( + scope_metric_request& scope_metric, + const std::string& metric_name) { + auto exist = scope_metric.metrics.find(metric_name); + if (exist != scope_metric.metrics.end()) { + return exist->second; + } + + ::opentelemetry::proto::metrics::v1::Metric* new_metric = + scope_metric.scope_metric->add_metrics(); + new_metric->set_name(metric_name); + + scope_metric.metrics.emplace(metric_name, new_metric); + + return new_metric; +} + +/** + * @brief add a perfdata to metric + * + * @param now + * @param perf + * @param scope_metric + */ +void scheduler::_add_metric_to_scope( + uint64_t now, + const com::centreon::common::perfdata& perf, + scope_metric_request& scope_metric) { + auto metric = _get_metric(scope_metric, perf.name()); + metric->set_unit(perf.unit()); + auto data_point = metric->mutable_gauge()->add_data_points(); + data_point->set_as_int(perf.value()); + data_point->set_time_unix_nano(now); + switch (perf.value_type()) { + case com::centreon::common::perfdata::counter: { + auto attrib_type = data_point->add_attributes(); + attrib_type->set_key("counter"); + break; + } + case com::centreon::common::perfdata::derive: { + auto attrib_type = data_point->add_attributes(); + attrib_type->set_key("derive"); + break; + } + case com::centreon::common::perfdata::absolute: { + auto attrib_type = data_point->add_attributes(); + attrib_type->set_key("absolute"); + break; + } + case com::centreon::common::perfdata::automatic: { + auto attrib_type = data_point->add_attributes(); + attrib_type->set_key("auto"); + break; + } + } + if (perf.critical() <= std::numeric_limits::max()) { + _add_exemplar(perf.critical_mode() ? "crit_ge" : "crit_gt", perf.critical(), + *data_point); + } + if (perf.critical_low() <= std::numeric_limits::max()) { + _add_exemplar(perf.critical_mode() ? "crit_le" : "crit_lt", + perf.critical_low(), *data_point); + } + if (perf.warning() <= std::numeric_limits::max()) { + _add_exemplar(perf.warning_mode() ? "warn_ge" : "warn_gt", perf.warning(), + *data_point); + } + if (perf.warning_low() <= std::numeric_limits::max()) { + _add_exemplar(perf.critical_mode() ? "warn_le" : "warn_lt", + perf.warning_low(), *data_point); + } + if (perf.min() <= std::numeric_limits::max()) { + _add_exemplar("min", perf.min(), *data_point); + } + if (perf.max() <= std::numeric_limits::max()) { + _add_exemplar("max", perf.max(), *data_point); + } +} + +/** + * @brief add an exemplar to metric such as crit_le, min, max.. + * + * @param label + * @param value + * @param data_point + */ +void scheduler::_add_exemplar( + const char* label, + double value, + ::opentelemetry::proto::metrics::v1::NumberDataPoint& data_point) { + auto exemplar = data_point.add_exemplars(); + auto attrib = exemplar->add_filtered_attributes(); + attrib->set_key(label); + exemplar->set_as_double(value); +} + +/** + * @brief add an exemplar to metric such as crit_le, min, max.. + * + * @param label + * @param value + * @param data_point + */ +void scheduler::_add_exemplar( + const char* label, + bool value, + ::opentelemetry::proto::metrics::v1::NumberDataPoint& data_point) { + auto exemplar = data_point.add_exemplars(); + auto attrib = exemplar->add_filtered_attributes(); + attrib->set_key(label); + exemplar->set_as_int(value); +} diff --git a/agent/test/CMakeLists.txt b/agent/test/CMakeLists.txt index c66ffde4ece..c677ecc9c93 100644 --- a/agent/test/CMakeLists.txt +++ b/agent/test/CMakeLists.txt @@ -20,6 +20,9 @@ add_executable(ut_agent config_test.cc + check_test.cc + check_exec_test.cc + scheduler_test.cc test_main.cc ${TESTS_SOURCES}) @@ -37,6 +40,7 @@ set_target_properties( target_link_libraries(ut_agent PRIVATE centagent_lib centreon_common + centreon_process GTest::gtest GTest::gtest_main GTest::gmock diff --git a/agent/test/check_exec_test.cc b/agent/test/check_exec_test.cc new file mode 100644 index 00000000000..34c050f48e0 --- /dev/null +++ b/agent/test/check_exec_test.cc @@ -0,0 +1,113 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#include + +#include "check_exec.hh" + +using namespace com::centreon::agent; + +extern std::shared_ptr g_io_context; + +static const std::string serv("serv"); +static const std::string cmd_name("command"); +static std::string command_line; + +TEST(check_exec_test, echo) { + command_line = "/bin/echo hello toto"; + int status; + std::list outputs; + std::condition_variable cond; + std::shared_ptr check = check_exec::load( + g_io_context, spdlog::default_logger(), time_point(), serv, cmd_name, + command_line, engine_to_agent_request_ptr(), + [&](const std::shared_ptr& caller, + int statuss, + const std::list& perfdata, + const std::list& output) { + status = statuss; + outputs = output; + cond.notify_one(); + }); + check->start_check(std::chrono::seconds(1)); + + std::mutex mut; + std::unique_lock l(mut); + cond.wait(l); + ASSERT_EQ(status, 0); + ASSERT_EQ(outputs.size(), 1); + ASSERT_EQ(*outputs.begin(), "hello toto"); +} + +TEST(check_exec_test, timeout) { + command_line = "/bin/sleep 5"; + int status; + std::list outputs; + std::condition_variable cond; + std::shared_ptr check = check_exec::load( + g_io_context, spdlog::default_logger(), time_point(), serv, cmd_name, + command_line, engine_to_agent_request_ptr(), + [&](const std::shared_ptr& caller, + int statuss, + const std::list& perfdata, + const std::list& output) { + status = statuss; + outputs = output; + cond.notify_one(); + }); + check->start_check(std::chrono::seconds(1)); + + std::mutex mut; + std::unique_lock l(mut); + cond.wait(l); + ASSERT_EQ(status, 3); + ASSERT_EQ(outputs.size(), 1); + ASSERT_EQ(*outputs.begin(), "Timeout at execution of /bin/sleep 5"); +} + +TEST(check_exec_test, bad_command) { + command_line = "/usr/bad_path/turlututu titi toto"; + int status; + std::list outputs; + std::condition_variable cond; + std::mutex mut; + std::shared_ptr check = check_exec::load( + g_io_context, spdlog::default_logger(), time_point(), serv, cmd_name, + command_line, engine_to_agent_request_ptr(), + [&](const std::shared_ptr& caller, + int statuss, + const std::list& perfdata, + const std::list& output) { + { + std::lock_guard l(mut); + status = statuss; + outputs = output; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + cond.notify_one(); + }); + check->start_check(std::chrono::seconds(1)); + + std::unique_lock l(mut); + cond.wait(l); + ASSERT_EQ(status, 3); + ASSERT_EQ(outputs.size(), 1); + ASSERT_EQ(*outputs.begin(), + "Fail to execute /usr/bad_path/turlututu titi toto : No such file " + "or directory"); +} diff --git a/agent/test/check_test.cc b/agent/test/check_test.cc new file mode 100644 index 00000000000..1a09b0761cf --- /dev/null +++ b/agent/test/check_test.cc @@ -0,0 +1,141 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#include + +#include "check.hh" + +using namespace com::centreon::agent; + +extern std::shared_ptr g_io_context; + +class dummy_check : public check { + duration _command_duration; + asio::system_timer _command_timer; + + public: + void start_check(const duration& timeout) override { + check::start_check(timeout); + _command_timer.expires_from_now(_command_duration); + _command_timer.async_wait([me = shared_from_this(), this, + running_index = _get_running_check_index()]( + const boost::system::error_code& err) { + if (err) { + return; + } + on_completion(running_index, 1, + std::list(), + {"output dummy_check of " + get_command_line()}); + }); + } + + template + dummy_check(const std::string& serv, + const std::string& command_name, + const std::string& command_line, + const duration& command_duration, + handler_type&& handler) + : check(g_io_context, + spdlog::default_logger(), + std::chrono::system_clock::now(), + serv, + command_name, + command_line, + nullptr, + handler), + _command_duration(command_duration), + _command_timer(*g_io_context) {} +}; + +static std::string serv("my_serv"); +static std::string cmd_name("my_command_name"); +static std::string cmd_line("my_command_line"); + +TEST(check_test, timeout) { + unsigned status = 0; + std::string output; + std::mutex cond_m; + std::condition_variable cond; + unsigned handler_call_cpt = 0; + + std::shared_ptr checker = std::make_shared( + serv, cmd_name, cmd_line, std::chrono::milliseconds(500), + [&status, &output, &handler_call_cpt, &cond]( + const std::shared_ptr&, unsigned statuss, + const std::list& perfdata, + const std::list& outputs) { + status = statuss; + if (outputs.size() == 1) { + output = *outputs.begin(); + } + ++handler_call_cpt; + cond.notify_all(); + }); + + checker->start_check(std::chrono::milliseconds(100)); + + std::unique_lock l(cond_m); + cond.wait(l); + + ASSERT_EQ(status, 3); + ASSERT_EQ(handler_call_cpt, 1); + ASSERT_EQ(output, "Timeout at execution of my_command_line"); + + // completion handler not called twice + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(status, 3); + ASSERT_EQ(handler_call_cpt, 1); + ASSERT_EQ(output, "Timeout at execution of my_command_line"); +} + +TEST(check_test, no_timeout) { + unsigned status = 0; + std::string output; + std::mutex cond_m; + std::condition_variable cond; + unsigned handler_call_cpt = 0; + + std::shared_ptr checker = std::make_shared( + serv, cmd_name, cmd_line, std::chrono::milliseconds(100), + [&status, &output, &handler_call_cpt, &cond]( + const std::shared_ptr&, unsigned statuss, + const std::list& perfdata, + const std::list& outputs) { + status = statuss; + if (outputs.size() == 1) { + output = *outputs.begin(); + } + ++handler_call_cpt; + cond.notify_all(); + }); + + checker->start_check(std::chrono::milliseconds(200)); + + std::unique_lock l(cond_m); + cond.wait(l); + + ASSERT_EQ(status, 1); + ASSERT_EQ(handler_call_cpt, 1); + ASSERT_EQ(output, "output dummy_check of my_command_line"); + + // completion handler not called twice + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(status, 1); + ASSERT_EQ(handler_call_cpt, 1); + ASSERT_EQ(output, "output dummy_check of my_command_line"); +} \ No newline at end of file diff --git a/agent/test/scheduler_test.cc b/agent/test/scheduler_test.cc new file mode 100644 index 00000000000..ccd9f47a7fc --- /dev/null +++ b/agent/test/scheduler_test.cc @@ -0,0 +1,450 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ + +#include + +#include "scheduler.hh" + +extern std::shared_ptr g_io_context; +using namespace com::centreon::agent; + +class tempo_check : public check { + asio::system_timer _completion_timer; + int _command_exit_status; + duration _completion_delay; + + public: + static std::vector> check_starts; + static std::mutex check_starts_m; + + static uint64_t completion_time; + + tempo_check(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point exp, + const std::string& serv, + const std::string& cmd_name, + const std::string& cmd_line, + const engine_to_agent_request_ptr& cnf, + int command_exit_status, + duration completion_delay, + check::completion_handler&& handler) + : check(io_context, + logger, + exp, + serv, + cmd_name, + cmd_line, + cnf, + std::move(handler)), + _completion_timer(*io_context), + _command_exit_status(command_exit_status), + _completion_delay(completion_delay) {} + + void start_check(const duration& timeout) override { + { + std::lock_guard l(check_starts_m); + check_starts.emplace_back(this, std::chrono::system_clock::now()); + } + check::start_check(timeout); + _completion_timer.expires_from_now(_completion_delay); + _completion_timer.async_wait([me = shared_from_this(), this, + check_running_index = + _get_running_check_index()]( + const boost::system::error_code& err) { + SPDLOG_TRACE("end of completion timer for serv {}", get_service()); + me->on_completion( + check_running_index, _command_exit_status, + com::centreon::common::perfdata::parse_perfdata( + 0, 0, + "rta=0,031ms;200,000;500,000;0; pl=0%;40;80;; rtmax=0,109ms;;;; " + "rtmin=0,011ms;;;;", + _logger), + {fmt::format("Command OK: {}", me->get_command_line())}); + completion_time = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + }); + } +}; + +std::vector> tempo_check::check_starts; +std::mutex tempo_check::check_starts_m; +uint64_t tempo_check::completion_time; + +class scheduler_test : public ::testing::Test { + public: + static void SetUpTestSuite() { + spdlog::default_logger()->set_level(spdlog::level::trace); + } + + std::shared_ptr create_conf( + unsigned nb_serv, + unsigned second_check_period, + unsigned export_period, + unsigned max_concurent_check, + unsigned check_timeout); +}; + +std::shared_ptr +scheduler_test::create_conf(unsigned nb_serv, + unsigned second_check_period, + unsigned export_period, + unsigned max_concurent_check, + unsigned check_timeout) { + std::shared_ptr conf = + std::make_shared(); + auto cnf = conf->mutable_config(); + cnf->set_check_interval(second_check_period); + cnf->set_export_period(export_period); + cnf->set_max_concurrent_checks(max_concurent_check); + cnf->set_check_timeout(check_timeout); + cnf->set_use_exemplar(true); + for (unsigned serv_index = 0; serv_index < nb_serv; ++serv_index) { + auto serv = cnf->add_services(); + serv->set_service_description(fmt::format("serv{}", serv_index + 1)); + serv->set_command_name(fmt::format("command{}", serv_index + 1)); + serv->set_command_line("/usr/bin/ls"); + } + return conf; +} + +TEST_F(scheduler_test, no_config) { + std::shared_ptr sched = scheduler::load( + g_io_context, spdlog::default_logger(), "my_host", + scheduler::default_config(), + [](const std::shared_ptr&) {}, + [](const std::shared_ptr&, + const std::shared_ptr&, time_point /* start expected*/, + const std::string& /*service*/, const std::string& /*cmd_name*/, + const std::string& /*cmd_line*/, + const engine_to_agent_request_ptr& /*engine to agent request*/, + check::completion_handler&&) { return std::shared_ptr(); }); + + std::weak_ptr weak_shed(sched); + sched.reset(); + + // scheduler must be owned by asio + ASSERT_TRUE(weak_shed.lock()); + + weak_shed.lock()->stop(); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + ASSERT_FALSE(weak_shed.lock()); +} + +TEST_F(scheduler_test, correct_schedule) { + std::shared_ptr sched = scheduler::load( + g_io_context, spdlog::default_logger(), "my_host", + create_conf(20, 1, 1, 50, 1), + [](const std::shared_ptr&) {}, + [](const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point start_expected, const std::string& service, + const std::string& cmd_name, const std::string& cmd_line, + const engine_to_agent_request_ptr& engine_to_agent_request, + check::completion_handler&& handler) { + return std::make_shared( + io_context, logger, start_expected, service, cmd_name, cmd_line, + engine_to_agent_request, 0, std::chrono::milliseconds(50), + std::move(handler)); + }); + + { + std::lock_guard l(tempo_check::check_starts_m); + tempo_check::check_starts.clear(); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1100)); + + // we have 2 * 10 = 20 checks spread over 1 second + duration expected_interval = std::chrono::milliseconds(50); + + { + std::lock_guard l(tempo_check::check_starts_m); + ASSERT_GE(tempo_check::check_starts.size(), 20); + bool first = true; + std::pair previous; + for (const auto& check_time : tempo_check::check_starts) { + if (first) { + first = false; + } else { + ASSERT_NE(previous.first, check_time.first); + ASSERT_GT((check_time.second - previous.second), + expected_interval - std::chrono::milliseconds(1)); + ASSERT_LT((check_time.second - previous.second), + expected_interval + std::chrono::milliseconds(1)); + } + previous = check_time; + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + + { + std::lock_guard l(tempo_check::check_starts_m); + ASSERT_GE(tempo_check::check_starts.size(), 40); + bool first = true; + std::pair previous; + for (const auto& check_time : tempo_check::check_starts) { + if (first) { + first = false; + } else { + ASSERT_NE(previous.first, check_time.first); + ASSERT_TRUE((check_time.second - previous.second) > + expected_interval - std::chrono::milliseconds(1)); + ASSERT_TRUE((check_time.second - previous.second) < + expected_interval + std::chrono::milliseconds(1)); + } + previous = check_time; + } + } + + sched->stop(); +} + +TEST_F(scheduler_test, time_out) { + std::shared_ptr exported_request; + std::condition_variable export_cond; + uint64_t expected_completion_time = + std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + std::mutex m; + std::shared_ptr sched = scheduler::load( + g_io_context, spdlog::default_logger(), "my_host", + create_conf(1, 1, 1, 1, 1), + [&](const std::shared_ptr& req) { + { + std::lock_guard l(m); + exported_request = req; + } + export_cond.notify_all(); + }, + [](const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point start_expected, const std::string& service, + const std::string& cmd_name, const std::string& cmd_line, + const engine_to_agent_request_ptr& engine_to_agent_request, + check::completion_handler&& handler) { + return std::make_shared( + io_context, logger, start_expected, service, cmd_name, cmd_line, + engine_to_agent_request, 0, std::chrono::milliseconds(1500), + std::move(handler)); + }); + std::unique_lock l(m); + export_cond.wait(l); + + ASSERT_TRUE(exported_request); + ASSERT_EQ(exported_request->otel_request().resource_metrics_size(), 1); + const ::opentelemetry::proto::metrics::v1::ResourceMetrics& res = + exported_request->otel_request().resource_metrics()[0]; + const auto& res_attrib = res.resource().attributes(); + ASSERT_EQ(res_attrib.size(), 2); + ASSERT_EQ(res_attrib.at(0).key(), "host.name"); + ASSERT_EQ(res_attrib.at(0).value().string_value(), "my_host"); + ASSERT_EQ(res_attrib.at(1).key(), "service.name"); + ASSERT_EQ(res_attrib.at(1).value().string_value(), "serv1"); + ASSERT_EQ(res.scope_metrics_size(), 1); + const ::opentelemetry::proto::metrics::v1::ScopeMetrics& scope_metrics = + res.scope_metrics()[0]; + ASSERT_EQ(scope_metrics.metrics_size(), 1); + const ::opentelemetry::proto::metrics::v1::Metric metric = + scope_metrics.metrics()[0]; + ASSERT_EQ(metric.name(), "status"); + ASSERT_EQ(metric.description(), "Timeout at execution of /usr/bin/ls"); + ASSERT_EQ(metric.gauge().data_points_size(), 1); + const auto& data_point = metric.gauge().data_points()[0]; + ASSERT_EQ(data_point.as_int(), 3); + // timeout 1s + ASSERT_GE(data_point.time_unix_nano(), expected_completion_time + 1000000000); + ASSERT_LE(data_point.time_unix_nano(), expected_completion_time + 1500000000); + + sched->stop(); +} + +TEST_F(scheduler_test, correct_output_examplar) { + std::shared_ptr exported_request; + std::condition_variable export_cond; + time_point now = std::chrono::system_clock::now(); + std::shared_ptr sched = scheduler::load( + g_io_context, spdlog::default_logger(), "my_host", + create_conf(2, 1, 2, 10, 1), + [&](const std::shared_ptr& req) { + exported_request = req; + export_cond.notify_all(); + }, + [](const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point start_expected, const std::string& service, + const std::string& cmd_name, const std::string& cmd_line, + const engine_to_agent_request_ptr& engine_to_agent_request, + check::completion_handler&& handler) { + return std::make_shared( + io_context, logger, start_expected, service, cmd_name, cmd_line, + engine_to_agent_request, 0, std::chrono::milliseconds(10), + std::move(handler)); + }); + std::mutex m; + std::unique_lock l(m); + export_cond.wait(l); + + ASSERT_TRUE(exported_request); + + SPDLOG_INFO("export:{}", exported_request->otel_request().DebugString()); + + ASSERT_EQ(exported_request->otel_request().resource_metrics_size(), 2); + const ::opentelemetry::proto::metrics::v1::ResourceMetrics& res = + exported_request->otel_request().resource_metrics()[0]; + const auto& res_attrib = res.resource().attributes(); + ASSERT_EQ(res_attrib.size(), 2); + ASSERT_EQ(res_attrib.at(0).key(), "host.name"); + ASSERT_EQ(res_attrib.at(0).value().string_value(), "my_host"); + ASSERT_EQ(res_attrib.at(1).key(), "service.name"); + ASSERT_EQ(res_attrib.at(1).value().string_value(), "serv1"); + ASSERT_EQ(res.scope_metrics_size(), 1); + const ::opentelemetry::proto::metrics::v1::ScopeMetrics& scope_metrics = + res.scope_metrics()[0]; + ASSERT_GE(scope_metrics.metrics_size(), 5); + const ::opentelemetry::proto::metrics::v1::Metric metric = + scope_metrics.metrics()[0]; + ASSERT_EQ(metric.name(), "status"); + ASSERT_EQ(metric.description(), "Command OK: /usr/bin/ls"); + ASSERT_GE(metric.gauge().data_points_size(), 1); + const auto& data_point_state = metric.gauge().data_points()[0]; + ASSERT_EQ(data_point_state.as_int(), 0); + uint64_t first_time_point = data_point_state.time_unix_nano(); + + const ::opentelemetry::proto::metrics::v1::ResourceMetrics& res2 = + exported_request->otel_request().resource_metrics()[1]; + const auto& res_attrib2 = res2.resource().attributes(); + ASSERT_EQ(res_attrib2.size(), 2); + ASSERT_EQ(res_attrib2.at(0).key(), "host.name"); + ASSERT_EQ(res_attrib2.at(0).value().string_value(), "my_host"); + ASSERT_EQ(res_attrib2.at(1).key(), "service.name"); + ASSERT_EQ(res_attrib2.at(1).value().string_value(), "serv2"); + ASSERT_EQ(res2.scope_metrics_size(), 1); + + const ::opentelemetry::proto::metrics::v1::ScopeMetrics& scope_metrics2 = + res2.scope_metrics()[0]; + ASSERT_EQ(scope_metrics2.metrics_size(), 5); + const ::opentelemetry::proto::metrics::v1::Metric metric2 = + scope_metrics2.metrics()[0]; + ASSERT_EQ(metric2.name(), "status"); + ASSERT_EQ(metric2.description(), "Command OK: /usr/bin/ls"); + ASSERT_GE(metric2.gauge().data_points_size(), 1); + const auto& data_point_state2 = metric2.gauge().data_points()[0]; + ASSERT_EQ(data_point_state2.as_int(), 0); + + ASSERT_LE(first_time_point + 400000000, data_point_state2.time_unix_nano()); + ASSERT_GE(first_time_point + 600000000, data_point_state2.time_unix_nano()); + + sched->stop(); +} + +class concurent_check : public check { + asio::system_timer _completion_timer; + int _command_exit_status; + duration _completion_delay; + + public: + static std::set checked; + static std::set active_checks; + static unsigned max_active_check; + + concurent_check(const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point exp, + const std::string& serv, + const std::string& cmd_name, + const std::string& cmd_line, + const engine_to_agent_request_ptr& cnf, + int command_exit_status, + duration completion_delay, + check::completion_handler&& handler) + : check(io_context, + logger, + exp, + serv, + cmd_name, + cmd_line, + cnf, + std::move(handler)), + _completion_timer(*io_context), + _command_exit_status(command_exit_status), + _completion_delay(completion_delay) {} + + void start_check(const duration& timeout) override { + check::start_check(timeout); + active_checks.insert(this); + if (active_checks.size() > max_active_check) { + max_active_check = active_checks.size(); + } + _completion_timer.expires_from_now(_completion_delay); + _completion_timer.async_wait([me = shared_from_this(), this, + check_running_index = + _get_running_check_index()]( + const boost::system::error_code& err) { + active_checks.erase(this); + checked.insert(this); + SPDLOG_TRACE("end of completion timer for serv {}", get_service()); + me->on_completion( + check_running_index, _command_exit_status, + com::centreon::common::perfdata::parse_perfdata( + 0, 0, + "rta=0,031ms;200,000;500,000;0; pl=0%;40;80;; rtmax=0,109ms;;;; " + "rtmin=0,011ms;;;;", + _logger), + {fmt::format("Command OK: {}", me->get_command_line())}); + }); + } +}; + +std::set concurent_check::checked; +std::set concurent_check::active_checks; +unsigned concurent_check::max_active_check; + +TEST_F(scheduler_test, max_concurent) { + std::shared_ptr sched = scheduler::load( + g_io_context, spdlog::default_logger(), "my_host", + create_conf(200, 1, 1, 10, 1), + [&](const std::shared_ptr& req) {}, + [](const std::shared_ptr& io_context, + const std::shared_ptr& logger, + time_point start_expected, const std::string& service, + const std::string& cmd_name, const std::string& cmd_line, + const engine_to_agent_request_ptr& engine_to_agent_request, + check::completion_handler&& handler) { + return std::make_shared( + io_context, logger, start_expected, service, cmd_name, cmd_line, + engine_to_agent_request, 0, std::chrono::milliseconds(75), + std::move(handler)); + }); + + // to many tests to be completed in one second + std::this_thread::sleep_for(std::chrono::milliseconds(1100)); + ASSERT_LT(concurent_check::checked.size(), 200); + ASSERT_EQ(concurent_check::max_active_check, 10); + + // all tests must be completed in 1.5s + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + ASSERT_EQ(concurent_check::max_active_check, 10); + ASSERT_EQ(concurent_check::checked.size(), 200); + + sched->stop(); +} \ No newline at end of file diff --git a/engine/modules/opentelemetry/inc/com/centreon/engine/modules/opentelemetry/conf_helper.hh b/engine/modules/opentelemetry/inc/com/centreon/engine/modules/opentelemetry/conf_helper.hh new file mode 100644 index 00000000000..c3a0456eeae --- /dev/null +++ b/engine/modules/opentelemetry/inc/com/centreon/engine/modules/opentelemetry/conf_helper.hh @@ -0,0 +1,100 @@ +/** + * Copyright 2024 Centreon + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * For more information : contact@centreon.com + */ +#ifndef CCE_MOD_CONF_HELPER_OPENTELEMETRY_HH +#define CCE_MOD_CONF_HELPER_OPENTELEMETRY_HH + +#include "com/centreon/engine/host.hh" +#include "com/centreon/engine/macros.hh" +#include "com/centreon/engine/service.hh" + +#include "com/centreon/engine/commands/forward.hh" + +namespace com::centreon::engine::modules::opentelemetry { + +/** + * @brief extract opentelemetry commands from an host list + * This function must be called from engine main thread, not grpc ones + * + * @tparam command_handler callback called on every opentelemetry command found + * @param host_name name of the host supervised by the agent or telegraf + * @param handler + * @return true at least one opentelemetry command was found + * @return false + */ +template +bool get_otel_commands(const std::string& host_name, + command_handler&& handler, + const std::shared_ptr& logger) { + auto use_otl_command = [](const checkable& to_test) -> bool { + if (to_test.get_check_command_ptr()->get_type() == + commands::command::e_type::otel) + return true; + if (to_test.get_check_command_ptr()->get_type() == + commands::command::e_type::forward) { + return std::static_pointer_cast( + to_test.get_check_command_ptr()) + ->get_sub_command() + ->get_type() == commands::command::e_type::otel; + } + return false; + }; + + bool ret = false; + + auto hst_iter = host::hosts.find(host_name); + if (hst_iter == host::hosts.end()) { + SPDLOG_LOGGER_ERROR(logger, "unknown host:{}", host_name); + return false; + } + std::shared_ptr hst = hst_iter->second; + std::string cmd_line; + // host check use otl? + if (use_otl_command(*hst)) { + nagios_macros* macros(get_global_macros()); + + ret |= handler(hst->check_command(), hst->get_check_command_line(macros), + "", logger); + clear_volatile_macros_r(macros); + } else { + SPDLOG_LOGGER_DEBUG( + logger, "host {} doesn't use opentelemetry to do his check", host_name); + } + // services of host + auto serv_iter = service::services_by_id.lower_bound({hst->host_id(), 0}); + for (; serv_iter != service::services_by_id.end() && + serv_iter->first.first == hst->host_id(); + ++serv_iter) { + std::shared_ptr serv = serv_iter->second; + if (use_otl_command(*serv)) { + nagios_macros* macros(get_global_macros()); + ret |= + handler(serv->check_command(), serv->get_check_command_line(macros), + serv->name(), logger); + clear_volatile_macros_r(macros); + } else { + SPDLOG_LOGGER_DEBUG( + logger, + "host {} service {} doesn't use opentelemetry to do his check", + host_name, serv->name()); + } + } + return ret; +} + +} // namespace com::centreon::engine::modules::opentelemetry +#endif diff --git a/engine/modules/opentelemetry/inc/com/centreon/engine/modules/opentelemetry/telegraf/conf_server.hh b/engine/modules/opentelemetry/inc/com/centreon/engine/modules/opentelemetry/telegraf/conf_server.hh index 1e6a94b9f6b..989af594b33 100644 --- a/engine/modules/opentelemetry/inc/com/centreon/engine/modules/opentelemetry/telegraf/conf_server.hh +++ b/engine/modules/opentelemetry/inc/com/centreon/engine/modules/opentelemetry/telegraf/conf_server.hh @@ -75,7 +75,7 @@ class conf_session : public connection_class { void on_receive_request(const std::shared_ptr& request); void answer_to_request(const std::shared_ptr& request, - std::vector&& host_list); + const std::string& host); bool _get_commands(const std::string& host_name, std::string& request_body); diff --git a/engine/modules/opentelemetry/src/data_point_fifo.cc b/engine/modules/opentelemetry/src/data_point_fifo.cc index 3082d0644c5..00e4bec9d58 100644 --- a/engine/modules/opentelemetry/src/data_point_fifo.cc +++ b/engine/modules/opentelemetry/src/data_point_fifo.cc @@ -72,10 +72,10 @@ void data_point_fifo::clean() { /** * @brief erase oldest element * - * @param expiry data points oldest than this nano timestamp are erased + * @param expiry data points older than this nano timestamp are erased */ void data_point_fifo::clean_oldest(uint64_t expiry) { - while (!_fifo.empty() && _fifo.begin()->get_nano_timestamp() <= expiry) { + while (!_fifo.empty() && _fifo.begin()->get_nano_timestamp() < expiry) { _fifo.erase(_fifo.begin()); } } diff --git a/engine/modules/opentelemetry/src/telegraf/conf_server.cc b/engine/modules/opentelemetry/src/telegraf/conf_server.cc index b7b53fa2ec8..d6e4d720571 100644 --- a/engine/modules/opentelemetry/src/telegraf/conf_server.cc +++ b/engine/modules/opentelemetry/src/telegraf/conf_server.cc @@ -18,6 +18,7 @@ #include +#include "conf_helper.hh" #include "telegraf/conf_server.hh" #include "com/centreon/engine/globals.hh" @@ -37,7 +38,7 @@ using namespace com::centreon::engine; static constexpr std::string_view _config_schema(R"( { "$schema": "http://json-schema.org/draft-04/schema#", - "title": "grpc config", + "title": "telegraf config", "properties": { "http_server" : { "listen_address": { @@ -240,19 +241,18 @@ template void conf_session::on_receive_request( const std::shared_ptr& request) { boost::url_view parsed(request->target()); - std::vector host_list; + std::string host; for (const auto& get_param : parsed.params()) { if (get_param.key == "host") { - host_list.emplace_back(get_param.value); + host = get_param.value; } } auto to_call = std::packaged_task( - [me = shared_from_this(), request, - hosts = std::move(host_list)]() mutable -> int32_t { + [me = shared_from_this(), request, host]() mutable -> int32_t { // then we are in the main thread // services, hosts and commands are stable - me->answer_to_request(request, std::move(hosts)); + me->answer_to_request(request, host); return 0; }); command_manager::instance().enqueue(std::move(to_call)); @@ -386,15 +386,10 @@ bool conf_session::_get_commands(const std::string& host_name, template void conf_session::answer_to_request( const std::shared_ptr& request, - std::vector&& host_list) { + const std::string& host) { http::response_ptr resp(std::make_shared()); resp->version(request->version()); - if (host_list.empty()) { - SPDLOG_LOGGER_ERROR(this->_logger, "no host found in target argument {}", - *request); - } - resp->body() = fmt::format(R"(# Centreon telegraf configuration # This telegraf configuration is generated by centreon centengine [agent] @@ -407,10 +402,7 @@ void conf_session::answer_to_request( )", _telegraf_conf->get_check_interval(), _telegraf_conf->get_engine_otl_endpoint()); - bool at_least_one_found = false; - for (const std::string& host : host_list) { - at_least_one_found |= _get_commands(host, resp->body()); - } + bool at_least_one_found = _get_commands(host, resp->body()); if (at_least_one_found) { resp->result(boost::beast::http::status::ok); resp->insert(boost::beast::http::field::content_type, "text/plain"); diff --git a/tests/broker-engine/opentelemetry.robot b/tests/broker-engine/opentelemetry.robot index b9050880ad3..a85c8f6669d 100644 --- a/tests/broker-engine/opentelemetry.robot +++ b/tests/broker-engine/opentelemetry.robot @@ -116,11 +116,24 @@ BEOTEL_TELEGRAF_CHECK_HOST ${resources_list} Ctn Create Otl Request ${0} host_1 + # check without feed + ${start} Ctn Get Round Current Date + Ctn Schedule Forced Host Check host_1 + ${result} Ctn Check Host Check Status With Timeout + ... host_1 + ... 35 + ... ${start} + ... 0 + ... (No output returned from host check) + Should Be True ${result} hosts table not updated + + Log To Console export metrics Ctn Send Otl To Engine 4317 ${resources_list} Sleep 5 + # feed and check ${start} Ctn Get Round Current Date Ctn Schedule Forced Host Check host_1 @@ -128,18 +141,6 @@ BEOTEL_TELEGRAF_CHECK_HOST ${result} Ctn Check Host Check Status With Timeout host_1 30 ${start} 0 OK Should Be True ${result} hosts table not updated - # check without feed - - ${start} Ctn Get Round Current Date - Ctn Schedule Forced Host Check host_1 - ${result} Ctn Check Host Check Status With Timeout - ... host_1 - ... 35 - ... ${start} - ... 0 - ... (No output returned from host check) - Should Be True ${result} hosts table not updated - # check then feed, three times to modify hard state ${start} Ctn Get Round Current Date Ctn Schedule Forced Host Check host_1 @@ -196,18 +197,6 @@ BEOTEL_TELEGRAF_CHECK_SERVICE ${resources_list} Ctn Create Otl Request ${0} host_1 service_1 - Log To Console export metrics - Ctn Send Otl To Engine 4317 ${resources_list} - - Sleep 5 - - # feed and check - ${start} Ctn Get Round Current Date - Ctn Schedule Forced Svc Check host_1 service_1 - - ${result} Ctn Check Service Check Status With Timeout host_1 service_1 30 ${start} 0 OK - Should Be True ${result} services table not updated - # check without feed ${start} Ctn Get Round Current Date @@ -221,6 +210,18 @@ BEOTEL_TELEGRAF_CHECK_SERVICE ... (No output returned from plugin) Should Be True ${result} services table not updated + Log To Console export metrics + Ctn Send Otl To Engine 4317 ${resources_list} + + Sleep 5 + + # feed and check + ${start} Ctn Get Round Current Date + Ctn Schedule Forced Svc Check host_1 service_1 + + ${result} Ctn Check Service Check Status With Timeout host_1 service_1 30 ${start} 0 OK + Should Be True ${result} services table not updated + # check then feed, three times to modify hard state ${start} Ctn Get Round Current Date Ctn Schedule Forced Svc Check host_1 service_1 @@ -302,7 +303,7 @@ BEOTEL_SERVE_TELEGRAF_CONFIGURATION_CRYPTED Sleep 1 ${telegraf_conf_response} GET ... verify=${False} - ... url=https://localhost:1443/engine?host=host_1&host=host_2&host=host_3 + ... url=https://localhost:1443/engine?host=host_1 Should Be Equal As Strings ${telegraf_conf_response.reason} OK no response received or error response ${content_compare_result} Ctn Compare String With File @@ -375,7 +376,7 @@ BEOTEL_SERVE_TELEGRAF_CONFIGURATION_NO_CRYPTED Should Be True ${result} "server listen on 0.0.0.0:1443" should be available. Sleep 1 ${telegraf_conf_response} GET - ... url=http://localhost:1443/engine?host=host_1&host=host_2&host=host_3 + ... url=http://localhost:1443/engine?host=host_1 Should Be Equal As Strings ${telegraf_conf_response.reason} OK no response received or error response