Skip to content

Commit

Permalink
MON-14511-poller-wait-at-most-5s-to-send-shutdown-event-before-shutdo…
Browse files Browse the repository at this point in the history
…wn (#351)

REFS:MON14511
  • Loading branch information
jean-christophe81 authored Sep 7, 2022
1 parent 9625276 commit 74e5bad
Show file tree
Hide file tree
Showing 25 changed files with 246 additions and 22 deletions.
34 changes: 21 additions & 13 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
cmake_minimum_required(VERSION 3.16)
project("Centreon Collect" C CXX)

if(NOT CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND NOT CMAKE_CXX_COMPILER_ID
STREQUAL "Clang")
STREQUAL "Clang")
message(
FATAL_ERROR "You can build broker with g++ or clang++. CMake will exit.")
endif()

# set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++14 -stdlib=libc++")
# set(CMAKE_CXX_COMPILER "clang++")
add_definitions("-D_GLIBCXX_USE_CXX11_ABI=1")
Expand All @@ -45,6 +47,7 @@ set(BUILD_ARGS "-w" "dupbuild=warn")
#
if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
file(STRINGS "/etc/os-release" release REGEX "^ID")

foreach(line ${release})
if(${line} MATCHES "ID_LIKE=.*")
string(REGEX REPLACE "ID_LIKE=\"(.*)\"" "\\1" like ${line})
Expand All @@ -54,15 +57,16 @@ if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
string(REGEX REPLACE "ID=\"(.*)\"" "\\1" id ${line})
endif()
endforeach()

string(TOLOWER "${like}" like)
string(TOLOWER "${id}" id)

if(("${id}" MATCHES "debian")
OR ("${like}" MATCHES "debian")
OR ("${id}" MATCHES "ubuntu")
OR ("${like}" MATCHES "ubuntu"))
OR("${like}" MATCHES "debian")
OR("${id}" MATCHES "ubuntu")
OR("${like}" MATCHES "ubuntu"))
set(OS_DISTRIBUTOR "Debian")
elseif(("${id}" MATCHES "centos") OR ("${like}" MATCHES "centos"))
elseif(("${id}" MATCHES "centos") OR("${like}" MATCHES "centos"))
set(OS_DISTRIBUTOR "CentOS")
else()
message(WARNING "lsb_release in not installed")
Expand All @@ -89,25 +93,28 @@ add_definitions(-DCENTREON_CONNECTOR_VERSION=\"${COLLECT_VERSION}\")
# ########### CONSTANTS ###########
set(USER_BROKER centreon-broker)
set(USER_ENGINE centreon-engine)
# ##############################################################################

# ##############################################################################
include(${CMAKE_BINARY_DIR}/conanbuildinfo.cmake)
conan_basic_setup(TARGETS)

find_package(Protobuf REQUIRED)

message(NOTICE "-- use protoc compiler: ${Protobuf_PROTOC_EXECUTABLE}")

include(GNUInstallDirs)

# var directories.
set(BROKER_VAR_LOG_DIR
"${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/log/centreon-broker")
"${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/log/centreon-broker")
set(BROKER_VAR_LIB_DIR
"${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/lib/centreon-broker")
"${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/lib/centreon-broker")
set(ENGINE_VAR_LOG_DIR
"${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/log/centreon-engine")
"${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/log/centreon-engine")
set(ENGINE_VAR_LOG_ARCHIVE_DIR
"${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/log/centreon-engine/archives")
"${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/log/centreon-engine/archives")
set(ENGINE_VAR_LIB_DIR
"${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/lib/centreon-engine")
"${CMAKE_INSTALL_FULL_LOCALSTATEDIR}/lib/centreon-engine")

set(CMAKE_INSTALL_PREFIX "/usr")
option(WITH_TESTING "Build unit tests." OFF)
Expand All @@ -116,6 +123,7 @@ option(WITH_CONF "Install configuration files." ON)

# Code coverage on unit tests
option(WITH_COVERAGE "Add code coverage on unit tests." OFF)

if(WITH_TESTING AND WITH_COVERAGE)
set(CMAKE_BUILD_TYPE "Debug")
include(cmake/CodeCoverage.cmake)
Expand All @@ -127,7 +135,7 @@ set(protobuf_MODULE_COMPATIBLE True)
add_definitions(${spdlog_DEFINITIONS})

include_directories(${CMAKE_SOURCE_DIR} ${CONAN_INCLUDE_DIRS}
${CMAKE_SOURCE_DIR}/clib/inc)
${CMAKE_SOURCE_DIR}/clib/inc)

add_subdirectory(bbdo)
add_subdirectory(broker)
Expand All @@ -144,4 +152,4 @@ add_custom_target(test-connector COMMAND tests/ut_connector)
add_custom_target(test DEPENDS test-broker test-engine test-clib test-connector)

add_custom_target(test-coverage DEPENDS broker-test-coverage
engine-test-coverage clib-test-coverage)
engine-test-coverage clib-test-coverage)
2 changes: 2 additions & 0 deletions broker/core/inc/com/centreon/broker/io/stream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class stream {
bool validate(std::shared_ptr<io::data> const& d, std::string const& error);
virtual int write(std::shared_ptr<data> const& d) = 0;
const std::string& get_name() const { return _name; }

virtual bool wait_for_all_events_written(unsigned ms_timeout);
};
} // namespace io

Expand Down
1 change: 0 additions & 1 deletion broker/core/inc/com/centreon/broker/multiplexing/muxer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class muxer : public io::stream {
const bool _persistent;

std::unique_ptr<persistent_file> _file;

std::condition_variable _cv;
mutable std::mutex _mutex;
std::list<std::shared_ptr<io::data>> _events;
Expand Down
2 changes: 2 additions & 0 deletions broker/core/inc/com/centreon/broker/processing/acceptor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class acceptor : public endpoint {
void set_read_filters(const absl::flat_hash_set<uint32_t>& filters);
void set_retry_interval(time_t retry_interval);
void set_write_filters(const absl::flat_hash_set<uint32_t>& filters);

bool wait_for_all_events_written(unsigned ms_timeout) override;
};
} // namespace processing

Expand Down
2 changes: 2 additions & 0 deletions broker/core/inc/com/centreon/broker/processing/endpoint.hh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class endpoint : public stat_visitable {
virtual void update() {}
virtual void start() = 0;
virtual void exit() = 0;

virtual bool wait_for_all_events_written(unsigned) { return true; }
};
} // namespace processing

Expand Down
1 change: 1 addition & 0 deletions broker/core/inc/com/centreon/broker/processing/failover.hh
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class failover : public endpoint {
void set_failover(std::shared_ptr<processing::failover> fo);
void set_retry_interval(time_t retry_interval);
void update() override;
bool wait_for_all_events_written(unsigned ms_timeout) override;

protected:
// From stat_visitable
Expand Down
2 changes: 2 additions & 0 deletions broker/core/inc/com/centreon/broker/processing/feeder.hh
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class feeder : public stat_visitable {
feeder& operator=(const feeder&) = delete;
bool is_finished() const noexcept;
const char* get_state() const;

bool wait_for_all_events_written(unsigned ms_timeout);
};
} // namespace processing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class stat_visitable {
virtual void _forward_statistic(nlohmann::json& tree);

public:
static constexpr unsigned idle_microsec_wait_idle_thread_delay = 100000;

stat_visitable(std::string const& name = std::string());
virtual ~stat_visitable() noexcept = default;
stat_visitable(stat_visitable const& other) = delete;
Expand Down
5 changes: 5 additions & 0 deletions broker/core/src/config/applier/endpoint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ void endpoint::_discard() {
_discarding = true;
log_v2::config()->debug("endpoint applier: destruction");

// wait for failover and feeder to push endloop event
::usleep(processing::stat_visitable::idle_microsec_wait_idle_thread_delay +
100000);
// Exit threads.
{
log_v2::config()->debug("endpoint applier: requesting threads termination");
Expand All @@ -215,6 +218,7 @@ void endpoint::_discard() {
// We begin with feeders
for (auto it = _endpoints.begin(); it != _endpoints.end();) {
if (it->second->is_feeder()) {
it->second->wait_for_all_events_written(5000);
log_v2::config()->trace(
"endpoint applier: send exit signal to endpoint '{}'",
it->second->get_name());
Expand All @@ -240,6 +244,7 @@ void endpoint::_discard() {

// We continue with failovers
for (auto it = _endpoints.begin(); it != _endpoints.end();) {
it->second->wait_for_all_events_written(5000);
log_v2::config()->trace(
"endpoint applier: send exit signal on endpoint '{}'",
it->second->get_name());
Expand Down
15 changes: 15 additions & 0 deletions broker/core/src/io/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,18 @@ bool stream::validate(std::shared_ptr<io::data> const& d,
}
return true;
}

/**
* @brief if it has a substream, it waits until the substream has sent all data
* on the wire
*
* @param ms_timeout
* @return true all data sent
* @return false timeout expires
*/
bool stream::wait_for_all_events_written(unsigned ms_timeout) {
if (_substream) {
return _substream->wait_for_all_events_written(ms_timeout);
}
return true;
}
13 changes: 12 additions & 1 deletion broker/core/src/multiplexing/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,16 @@ void engine::publish(const std::shared_ptr<io::data>& e) {
std::lock_guard<std::mutex> lock(_engine_m);
switch (_state) {
case stopped:
log_v2::core()->trace("engine::publish one event to file");
_cache_file->add(e);
_unprocessed_events++;
break;
case not_started:
log_v2::core()->trace("engine::publish one event to queue");
_kiew.push_back(e);
break;
default:
log_v2::core()->trace("engine::publish one event to queue_");
_kiew.push_back(e);
if (!_sending_to_subscribers) {
_sending_to_subscribers = true;
Expand All @@ -105,16 +108,22 @@ void engine::publish(const std::list<std::shared_ptr<io::data>>& to_publish) {
std::lock_guard<std::mutex> lock(_engine_m);
switch (_state) {
case stopped:
log_v2::core()->trace("engine::publish {} event to file",
to_publish.size());
for (auto& e : to_publish) {
_cache_file->add(e);
_unprocessed_events++;
}
break;
case not_started:
log_v2::core()->trace("engine::publish {} event to queue",
to_publish.size());
for (auto& e : to_publish)
_kiew.push_back(e);
break;
default:
log_v2::core()->trace("engine::publish {} event to queue_",
to_publish.size());
for (auto& e : to_publish)
_kiew.push_back(e);
if (!_sending_to_subscribers) {
Expand Down Expand Up @@ -187,7 +196,9 @@ void engine::stop() {
// Make sure that no more data is available.
if (!_sending_to_subscribers) {
log_v2::core()->info(
"multiplexing: sending events to muxers for the last time");
"multiplexing: sending events to muxers for the last time {} "
"events to send",
_kiew.size());
_sending_to_subscribers = true;
lock.unlock();
std::promise<void> promise;
Expand Down
16 changes: 16 additions & 0 deletions broker/core/src/processing/acceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,19 @@ void acceptor::_callback() noexcept {
_state = acceptor::finished;
_state_cv.notify_all();
}

/**
* @brief
*
* @param ms_timeout
* @return true
* @return false
*/
bool acceptor::wait_for_all_events_written(unsigned ms_timeout) {
std::lock_guard<std::mutex> lock(_stat_mutex);
bool ret = true;
for (processing::feeder* to_wait : _feeders) {
ret &= to_wait->wait_for_all_events_written(ms_timeout);
}
return ret;
}
10 changes: 9 additions & 1 deletion broker/core/src/processing/failover.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ void failover::_run() {
we = _stream->flush();
}
_muxer->ack_events(we);
::usleep(100000);
::usleep(idle_microsec_wait_idle_thread_delay);
}
}
}
Expand Down Expand Up @@ -576,3 +576,11 @@ void failover::start() {
bool failover::should_exit() const {
return _should_exit;
}

bool failover::wait_for_all_events_written(unsigned ms_timeout) {
std::lock_guard<std::timed_mutex> stream_lock(_stream_m);
if (_stream) {
return _stream->wait_for_all_events_written(ms_timeout);
}
return true;
}
10 changes: 9 additions & 1 deletion broker/core/src/processing/feeder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ void feeder::_callback() noexcept {
log_v2::processing()->trace(
"feeder '{}': timeout on stream and muxer, waiting for 100000µs",
_name);
::usleep(100000);
::usleep(idle_microsec_wait_idle_thread_delay);
}
}
} catch (exceptions::shutdown const& e) {
Expand Down Expand Up @@ -252,3 +252,11 @@ const char* feeder::get_state() const {
}
return "unknown";
}

bool feeder::wait_for_all_events_written(unsigned ms_timeout) {
misc::read_lock lock(_client_m);
if (_client) {
return _client->wait_for_all_events_written(ms_timeout);
}
return true;
}
4 changes: 3 additions & 1 deletion broker/grpc/inc/com/centreon/broker/grpc/channel.hh
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class channel : public std::enable_shared_from_this<channel> {
grpc_config::pointer _conf;

mutable std::mutex _protect;
mutable std::condition_variable _read_cond;
mutable std::condition_variable _read_cond, _write_cond;

channel(const std::string& class_name, const grpc_config::pointer& conf);

Expand Down Expand Up @@ -130,6 +130,8 @@ class channel : public std::enable_shared_from_this<channel> {
int write(const event_ptr&);
int flush();
virtual int stop();

bool wait_for_all_events_written(unsigned ms_timeout);
};
} // namespace grpc

Expand Down
2 changes: 2 additions & 0 deletions broker/grpc/inc/com/centreon/broker/grpc/stream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class stream : public io::stream {
int32_t stop() override;

bool is_down() const;

bool wait_for_all_events_written(unsigned ms_timeout) override;
};
} // namespace grpc

Expand Down
16 changes: 16 additions & 0 deletions broker/grpc/src/channel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ void channel::on_write_done(bool ok) {
_write_queue.pop_front();
data_to_write = !_write_queue.empty();
}
_write_cond.notify_all();
if (data_to_write) {
start_write();
}
Expand All @@ -226,3 +227,18 @@ void channel::on_write_done(bool ok) {
int channel::flush() {
return 0;
}

/**
* @brief wait for all events sent on the wire
*
* @param ms_timeout
* @return true if all events are sent
* @return false if timeout expires
*/
bool channel::wait_for_all_events_written(unsigned ms_timeout) {
log_v2::grpc()->trace("wait_for_all_events_written _write_queue.size()={}",
_write_queue.size());
unique_lock l(_protect);
return _write_cond.wait_for(l, std::chrono::milliseconds(ms_timeout),
[this]() { return _write_queue.empty(); });
}
16 changes: 16 additions & 0 deletions broker/grpc/src/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,19 @@ int32_t com::centreon::broker::grpc::stream::stop() {
bool com::centreon::broker::grpc::stream::is_down() const {
return _channel->is_down();
}

/**
* @brief wait for connection write queue empty
*
* @param ms_timeout
* @return true queue is empty
* @return false timeout expired
*/
bool com::centreon::broker::grpc::stream::wait_for_all_events_written(
unsigned ms_timeout) {
if (_channel->is_down()) {
return true;
}

return _channel->wait_for_all_events_written(ms_timeout);
}
Loading

0 comments on commit 74e5bad

Please sign in to comment.