Skip to content

Commit

Permalink
Mon 15407 add protobuf instance status message (#405)
Browse files Browse the repository at this point in the history
* MON-15407-add-protobuf-instance_status-message
REFS:MON-15407
  • Loading branch information
jean-christophe81 authored Oct 13, 2022
1 parent 0fb4a04 commit f438f81
Show file tree
Hide file tree
Showing 16 changed files with 291 additions and 37 deletions.
8 changes: 5 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

# Set necessary settings.
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
set(CMAKE_COLOR_DIAGNOSTICS ON)
cmake_minimum_required(VERSION 3.16)
project("Centreon Collect" C CXX)

Expand All @@ -35,11 +36,12 @@ endif()
# set(CMAKE_CXX_COMPILER "clang++")
add_definitions("-D_GLIBCXX_USE_CXX11_ABI=1")
option(NG "C++17 build." OFF)
if (NG)

if(NG)
set(CMAKE_CXX_STANDARD 17)
else ()
else()
set(CMAKE_CXX_STANDARD 14)
endif ()
endif()

set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_EXTENSIONS OFF)
Expand Down
1 change: 1 addition & 0 deletions bbdo/events.hh
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ enum data_element {
de_pb_host_check = 39,
de_pb_service_check = 40,
de_pb_log_entry = 41,
de_pb_instance_status = 42
};
} // namespace neb
namespace storage {
Expand Down
22 changes: 22 additions & 0 deletions bbdo/neb_proto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,25 @@ message LogEntry {
int32 status = 12;
int32 retry = 13;
}

message InstanceStatus {
BBDOHeader header = 1;

bool event_handlers = 2;
bool flap_detection = 3;
bool notifications = 4;
bool active_host_checks = 5;
bool active_service_checks = 6;
bool check_hosts_freshness = 7;
bool check_services_freshness = 8;
string global_host_event_handler = 9;
string global_service_event_handler = 10;
uint64 last_alive = 11;
uint64 last_command_check = 12;
bool obsess_over_hosts = 13;
bool obsess_over_services = 14;
bool passive_host_checks = 15;
bool passive_service_checks = 16;
uint64 instance_id = 17;

}
2 changes: 2 additions & 0 deletions broker/core/src/query_preparator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ mysql_stmt query_preparator::prepare_insert_or_update_table(
google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(
fmt::format("com.centreon.broker.{}", info->get_name()));

assert(desc);

std::vector<std::tuple<std::string, uint32_t, uint16_t>> pb_mapping;

std::string key;
Expand Down
1 change: 1 addition & 0 deletions broker/neb/inc/com/centreon/broker/neb/callbacks.hh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ int callback_pb_log(int callback_type, void* data);
int callback_module(int callback_type, void* data);
int callback_process(int callback_type, void* data);
int callback_program_status(int callback_type, void* data);
int callback_pb_program_status(int callback_type, void* data);
int callback_relation(int callback_type, void* data);
int callback_service(int callback_type, void* data);
int callback_service_check(int callback_type, void* data);
Expand Down
3 changes: 3 additions & 0 deletions broker/neb/inc/com/centreon/broker/neb/internal.hh
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ using pb_service_check =

using pb_log_entry =
io::protobuf<LogEntry, make_type(io::neb, neb::de_pb_log_entry)>;
using pb_instance_status =
io::protobuf<InstanceStatus,
make_type(io::neb, neb::de_pb_instance_status)>;

} // namespace neb

Expand Down
3 changes: 3 additions & 0 deletions broker/neb/src/broker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ void broker_module_init(void const* arg) {
e.register_event(make_type(io::neb, neb::de_pb_log_entry), "LogEntry",
&neb::pb_log_entry::operations, "logs");

e.register_event(make_type(io::neb, neb::de_pb_instance_status),
"InstanceStatus", &neb::pb_instance_status::operations,
"instances");
}
}
}
Expand Down
62 changes: 61 additions & 1 deletion broker/neb/src/callbacks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ static struct {
{NEBCALLBACK_HOST_CHECK_DATA, &neb::callback_host_check},
{NEBCALLBACK_HOST_CHECK_DATA, &neb::callback_pb_host_check},
{NEBCALLBACK_HOST_STATUS_DATA, &neb::callback_pb_host_status},
{NEBCALLBACK_PROGRAM_STATUS_DATA, &neb::callback_program_status},
{NEBCALLBACK_PROGRAM_STATUS_DATA, &neb::callback_pb_program_status},
{NEBCALLBACK_SERVICE_CHECK_DATA, &neb::callback_pb_service_check},
{NEBCALLBACK_SERVICE_STATUS_DATA, &neb::callback_pb_service_status},
{NEBCALLBACK_ADAPTIVE_SEVERITY_DATA, &neb::callback_severity},
Expand Down Expand Up @@ -2381,6 +2381,66 @@ int neb::callback_program_status(int callback_type, void* data) {
return 0;
}

/**
* @brief Function that process instance status data.
*
* This function is called by Nagios when some instance status data are
* available.
*
* @param[in] callback_type Type of the callback
* (NEBCALLBACK_PROGRAM_STATUS_DATA).
* @param[in] data A pointer to a nebstruct_program_status_data
* containing the program status data.
*
* @return 0 on success.
*/
int neb::callback_pb_program_status(int, void* data) {
// Log message.
SPDLOG_LOGGER_INFO(log_v2::neb(),
"callbacks: generating pb instance status event");

// In/Out variables.
std::shared_ptr<neb::pb_instance_status> is_obj{
std::make_shared<neb::pb_instance_status>()};
InstanceStatus& is(is_obj->mut_obj());

// Fill output var.
const nebstruct_program_status_data& program_status_data =
*static_cast<nebstruct_program_status_data*>(data);

SPDLOG_LOGGER_INFO(log_v2::neb(),
"callbacks: generating pb instance status event "
"global_service_event_handler={}",
program_status_data.global_host_event_handler);

is.set_instance_id(config::applier::state::instance().poller_id());
is.set_active_host_checks(program_status_data.active_host_checks_enabled);
is.set_active_service_checks(
program_status_data.active_service_checks_enabled);
is.set_check_hosts_freshness(check_host_freshness);
is.set_check_services_freshness(check_service_freshness);
is.set_event_handlers(program_status_data.event_handlers_enabled);
is.set_flap_detection(program_status_data.flap_detection_enabled);
if (!program_status_data.global_host_event_handler.empty())
is.set_global_host_event_handler(misc::string::check_string_utf8(
program_status_data.global_host_event_handler));
if (!program_status_data.global_service_event_handler.empty())
is.set_global_service_event_handler(misc::string::check_string_utf8(
program_status_data.global_service_event_handler));
is.set_last_alive(time(nullptr));
is.set_last_command_check(program_status_data.last_command_check);
is.set_notifications(program_status_data.notifications_enabled);
is.set_obsess_over_hosts(program_status_data.obsess_over_hosts);
is.set_obsess_over_services(program_status_data.obsess_over_services);
is.set_passive_host_checks(program_status_data.passive_host_checks_enabled);
is.set_passive_service_checks(
program_status_data.passive_service_checks_enabled);

// Send event.
gl_publisher.write(is_obj);
return 0;
}

/**
* @brief Function that process relation data.
*
Expand Down
8 changes: 4 additions & 4 deletions broker/neb/test/instance_status.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

using namespace com::centreon::broker;

class InstanceStatus : public ::testing::Test {
class TestInstanceStatus : public ::testing::Test {
void SetUp() override { randomize_init(); };

void TearDown() override { randomize_cleanup(); };
Expand All @@ -34,7 +34,7 @@ class InstanceStatus : public ::testing::Test {
/**
* Check instance_status's assignment operator.
*/
TEST_F(InstanceStatus, Assign) {
TEST_F(TestInstanceStatus, Assign) {
// Object #1.
neb::instance_status is1;
std::vector<randval> randvals1;
Expand All @@ -59,7 +59,7 @@ TEST_F(InstanceStatus, Assign) {
/**
* Check instance_status's copy constructor.
*/
TEST_F(InstanceStatus, CopyCtor) {
TEST_F(TestInstanceStatus, CopyCtor) {
// Object #1.
neb::instance_status is1;
std::vector<randval> randvals1;
Expand All @@ -80,7 +80,7 @@ TEST_F(InstanceStatus, CopyCtor) {
/**
* Check instance_status's default constructor.
*/
TEST_F(InstanceStatus, DefaultCtor) {
TEST_F(TestInstanceStatus, DefaultCtor) {
// Object.
neb::instance_status is;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ class stream : public io::stream {
database::mysql_stmt _host_status_update;
database::mysql_stmt _instance_insupdate;
database::mysql_stmt _instance_status_insupdate;
database::mysql_stmt _pb_instance_status_insupdate;
database::mysql_stmt _module_insert;
database::mysql_stmt _service_check_update;
database::mysql_stmt _pb_service_check_update;
Expand Down Expand Up @@ -339,6 +340,7 @@ class stream : public io::stream {
void _process_host_status(const std::shared_ptr<io::data>& d);
void _process_instance(const std::shared_ptr<io::data>& d);
void _process_instance_status(const std::shared_ptr<io::data>& d);
void _process_pb_instance_status(const std::shared_ptr<io::data>& d);
void _process_log(const std::shared_ptr<io::data>& d);
void _process_module(const std::shared_ptr<io::data>& d);
void _process_service_check(const std::shared_ptr<io::data>& d);
Expand Down
2 changes: 1 addition & 1 deletion broker/unified_sql/src/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void (stream::*const stream::_neb_processing_table[])(
&stream::_process_pb_host_check,
&stream::_process_pb_service_check,
&stream::_process_pb_log,
};
&stream::_process_pb_instance_status};

stream::stream(const database_config& dbcfg,
uint32_t rrd_len,
Expand Down
107 changes: 86 additions & 21 deletions broker/unified_sql/src/stream_sql.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2144,6 +2144,69 @@ void stream::_process_instance_status(const std::shared_ptr<io::data>& d) {
}
}

/**
* Process an instance status event. To work on an instance status, we must
* be sure the instance already exists in the database. So this query must
* be done by the same thread as the one that created the instance.
*
* @param[in] e Uncasted instance status.
*
* @return The number of events that can be acknowledged.
*/
void stream::_process_pb_instance_status(const std::shared_ptr<io::data>& d) {
const neb::pb_instance_status& is_obj =
*static_cast<neb::pb_instance_status*>(d.get());
const InstanceStatus& is = is_obj.obj();

int32_t conn = _mysql.choose_connection_by_instance(is.instance_id());

_finish_action(-1, actions::hosts | actions::acknowledgements |
actions::modules | actions::downtimes |
actions::comments);

// Log message.
SPDLOG_LOGGER_INFO(
log_v2::sql(),
"SQL: processing poller status event (id: {}, last alive: {} {})",
is.instance_id(), is.last_alive(), is.DebugString());

// Processing.
if (_is_valid_poller(is.instance_id())) {
// Prepare queries.
if (!_pb_instance_status_insupdate.prepared()) {
query_preparator::event_pb_unique unique{
{17, "instance_id", io::protobuf_base::invalid_on_zero, 0}};
query_preparator qp(neb::pb_instance_status::static_type(), unique);
_pb_instance_status_insupdate = qp.prepare_insert_or_update_table(
_mysql, "instances ",
{{17, "instance_id", io::protobuf_base::invalid_on_zero, 0},
{2, "event_handlers", 0, 0},
{3, "flap_detection", 0, 0},
{4, "notifications", 0, 0},
{5, "active_host_checks", 0, 0},
{6, "active_service_checks", 0, 0},
{7, "check_hosts_freshness", 0, 0},
{8, "check_services_freshness", 0, 0},
{9, "global_host_event_handler", 0,
get_instances_col_size(instances_global_host_event_handler)},
{10, "global_service_event_handler", 0,
get_instances_col_size(instances_global_service_event_handler)},
{11, "last_alive", 0, 0},
{12, "last_command_check", 0, 0},
{13, "obsess_over_hosts", 0, 0},
{14, "obsess_over_services", 0, 0},
{15, "passive_host_checks", 0, 0},
{16, "passive_service_checks", 0, 0}});
}

// Process object.
_pb_instance_status_insupdate << is_obj;
_mysql.run_statement(_pb_instance_status_insupdate,
database::mysql_error::update_poller, false, conn);
_add_action(conn, actions::instances);
}
}

/**
* Process a log event.
*
Expand Down Expand Up @@ -2305,11 +2368,11 @@ void stream::_process_service_check(const std::shared_ptr<io::data>& d) {
|| (sc.next_check >= now - 5 * 60) ||
!sc.next_check) { // - initial state
// Apply to DB.
SPDLOG_LOGGER_INFO(
log_v2::sql(),
"SQL: processing service check event (host: {}, service: {}, command: "
"{})",
sc.host_id, sc.service_id, sc.command_line);
SPDLOG_LOGGER_INFO(log_v2::sql(),
"SQL: processing service check event (host: {}, "
"service: {}, command: "
"{})",
sc.host_id, sc.service_id, sc.command_line);

// Prepare queries.
if (!_service_check_update.prepared()) {
Expand Down Expand Up @@ -2607,11 +2670,12 @@ void stream::_process_service_group_member(const std::shared_ptr<io::data>& d) {
// Delete.
else {
// Log message.
SPDLOG_LOGGER_INFO(
log_v2::sql(),
"SQL: disabling membership of service ({}, {}) to service group {} on "
"instance {}",
sgm.host_id, sgm.service_id, sgm.group_id, sgm.poller_id);
SPDLOG_LOGGER_INFO(log_v2::sql(),
"SQL: disabling membership of service ({}, {}) to "
"service group {} on "
"instance {}",
sgm.host_id, sgm.service_id, sgm.group_id,
sgm.poller_id);

if (!_service_group_member_delete.prepared()) {
query_preparator::event_unique unique;
Expand Down Expand Up @@ -3374,13 +3438,14 @@ void stream::_check_and_update_index_cache(const Service& ss) {
it_index_cache->second.host_name = fmt::to_string(hv);
it_index_cache->second.service_description = fmt::to_string(sv);
it_index_cache->second.interval = ss.check_interval();
SPDLOG_LOGGER_DEBUG(
log_v2::sql(),
"Updating index_data for host_id={} and service_id={}: host_name={}, "
"service_description={}, check_interval={}",
ss.host_id(), ss.service_id(), it_index_cache->second.host_name,
it_index_cache->second.service_description,
it_index_cache->second.interval);
SPDLOG_LOGGER_DEBUG(log_v2::sql(),
"Updating index_data for host_id={} and "
"service_id={}: host_name={}, "
"service_description={}, check_interval={}",
ss.host_id(), ss.service_id(),
it_index_cache->second.host_name,
it_index_cache->second.service_description,
it_index_cache->second.interval);
}
}
}
Expand Down Expand Up @@ -3671,10 +3736,10 @@ void stream::_process_severity(const std::shared_ptr<io::data>& d) {
// Processed object.
auto s{static_cast<const neb::pb_severity*>(d.get())};
auto& sv = s->obj();
SPDLOG_LOGGER_TRACE(
log_v2::sql(),
"SQL: severity event with id={}, type={}, name={}, level={}, icon_id={}",
sv.id(), sv.type(), sv.name(), sv.level(), sv.icon_id());
SPDLOG_LOGGER_TRACE(log_v2::sql(),
"SQL: severity event with id={}, type={}, name={}, "
"level={}, icon_id={}",
sv.id(), sv.type(), sv.name(), sv.level(), sv.icon_id());
uint64_t severity_id = _severity_cache[{sv.id(), sv.type()}];
int32_t conn = special_conn::severity % _mysql.connections_count();
switch (sv.action()) {
Expand Down
2 changes: 1 addition & 1 deletion tests/broker-engine/external-commands.robot
Original file line number Diff line number Diff line change
Expand Up @@ -2614,4 +2614,4 @@ BEHOSTCHECK
Execute SQL String UPDATE hosts set command_line='toto' where name='host_1'
schedule_forced_host_check host_1
${result}= check_host_check_with_timeout host_1 30 ${VarRoot}/lib/centreon-engine/check.pl 0 1
Should Be True ${result} msg=hosts table not updated
Should Be True ${result} msg=hosts table not updated
Loading

0 comments on commit f438f81

Please sign in to comment.