Skip to content

Commit

Permalink
enh(broker/queue_file): queue file stats added to muxer stats (#140)
Browse files Browse the repository at this point in the history
Others points:
* fix(coverage): coverage restoration
* chore(doc): READLE updated

REFS: MON-11318
  • Loading branch information
bouda1 committed Feb 7, 2022
1 parent 82e6ff2 commit 62b1cd9
Show file tree
Hide file tree
Showing 30 changed files with 260 additions and 812 deletions.
12 changes: 12 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ option(WITH_TESTING "Build unit tests." OFF)

option(WITH_CONF "Install configuration files." ON)

# Code coverage on unit tests
option(WITH_COVERAGE "Add code coverage on unit tests." OFF)
if (WITH_TESTING AND WITH_COVERAGE)
set(CMAKE_BUILD_TYPE "Debug")
include(cmake/CodeCoverage.cmake)
APPEND_COVERAGE_COMPILER_FLAGS()
endif ()

set(protobuf_MODULE_COMPATIBLE True)

add_definitions(${spdlog_DEFINITIONS})
Expand Down Expand Up @@ -97,3 +105,7 @@ add_custom_target(test-connector
add_custom_target(test
DEPENDS test-broker test-engine test-clib test-connector
)

add_custom_target(test-coverage
DEPENDS broker-test-coverage engine-test-coverage clib-test-coverage
)
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,4 +222,4 @@ maintainers:

<a href="https://github.com/bouda1"><img src="https://avatars1.githubusercontent.com/u/6324413?s=400&v=4" title="David Boucher" width="80" height="80"></a> &nbsp;
<a href="https://github.com/rem31"><img src="https://avatars.githubusercontent.com/u/73845199?s=460&v=4" title="Rémi Gres" width="80" height="80"></a> &nbsp;
<a href="https://github.com/Maaown"><img src="https://avatars.githubusercontent.com/u/39004150?v=4" title="Leonard Viktor Pooch" width="80" height="80"></a> &nbsp;
<a href="https://github.com/jean-christophe81"><img src="https://avatars.githubusercontent.com/u/98889244?v=4" title="Jean-Christophe Roques" width="80" height="80"></a> &nbsp;
8 changes: 0 additions & 8 deletions centreon-broker/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,6 @@ else ()
set(GROUP "root")
endif ()

# Code coverage on unit tests
option(WITH_COVERAGE "Add code coverage on unit tests." OFF)
if (WITH_TESTING AND WITH_COVERAGE)
set(CMAKE_BUILD_TYPE "Debug")
include(cmake/CodeCoverage.cmake)
APPEND_COVERAGE_COMPILER_FLAGS()
endif ()

# Set startup script to auto if not defined.
if (NOT WITH_STARTUP_SCRIPT)
set(WITH_STARTUP_SCRIPT "auto")
Expand Down
6 changes: 3 additions & 3 deletions centreon-broker/core/inc/com/centreon/broker/file/splitter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ namespace file {
class splitter : public fs_file {
bool _auto_delete;
std::string _base_path;
long _max_file_size;
const uint32_t _max_file_size;
std::shared_ptr<FILE> _rfile;
std::mutex* _rmutex;
int32_t _rid;
Expand All @@ -87,7 +87,7 @@ class splitter : public fs_file {
public:
splitter(std::string const& path,
fs_file::open_mode mode,
long max_file_size = 100000000,
uint32_t max_file_size = 100000000u,
bool auto_delete = false);
~splitter();
splitter(const splitter&) = delete;
Expand All @@ -102,7 +102,7 @@ class splitter : public fs_file {
void flush() override;

std::string get_file_path(int id = 0) const;
long get_max_file_size() const;
uint32_t max_file_size() const;
int32_t get_rid() const;
long get_roffset() const;
int32_t get_wid() const;
Expand Down
14 changes: 12 additions & 2 deletions centreon-broker/core/inc/com/centreon/broker/file/stream.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <memory>
#include <mutex>
#include "broker.pb.h"
#include "com/centreon/broker/file/splitter.hh"
#include "com/centreon/broker/io/stream.hh"
#include "com/centreon/broker/namespace.hh"
Expand All @@ -36,13 +37,21 @@ namespace file {
*/
class stream : public io::stream {
std::unique_ptr<splitter> _file;
QueueFileStats* _stats;
std::time_t _last_stats;
std::time_t _last_stats_perc;
mutable long long _last_read_offset;
mutable time_t _last_time;
mutable long long _last_write_offset;
std::array<std::pair<int64_t, double>, 10> _stats_perc;
size_t _stats_idx;
size_t _stats_size;

void _update_stats();

public:
stream(splitter* file);
~stream();
stream(splitter* file, QueueFileStats* s);
~stream() noexcept = default;
stream(const stream&) = delete;
stream& operator=(const stream&) = delete;
std::string peer() const override;
Expand All @@ -51,6 +60,7 @@ class stream : public io::stream {
void statistics(nlohmann::json& tree) const override;
int32_t write(std::shared_ptr<io::data> const& d) override;
int32_t stop() override;
uint32_t max_file_size() const;
};
} // namespace file

Expand Down
4 changes: 2 additions & 2 deletions centreon-broker/core/inc/com/centreon/broker/misc/math.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ namespace misc {
*
* @param pts An array of pairs (x, y)
* @param size If shorter than the array size, only the first size pairs of
* the array are taken for the regression regression.
* the array are taken for the regression.
* @param a The computed value for the linear coefficient a (output)
* @param b The computed value for the constant term b (output)
* @param b The computed value for the constant term b (output)
*
* @return true if no error are seen, false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ CCB_BEGIN()
* It uses BBDO, compression and file streams.
*/
class persistent_file : public io::stream {
QueueFileStats* _stats;
std::shared_ptr<file::stream> _splitter;

public:
persistent_file(const std::string& path);
persistent_file(const std::string& path, QueueFileStats* stats = nullptr);
~persistent_file() noexcept = default;
persistent_file(const persistent_file&) = delete;
persistent_file& operator=(const persistent_file&) = delete;
Expand Down
7 changes: 6 additions & 1 deletion centreon-broker/core/inc/com/centreon/broker/stats/center.hh
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,13 @@ class center {
std::string queue_file,
uint32_t size,
uint32_t unack);
void init_queue_file(std::string muxer,
std::string queue_file,
uint32_t max_file_size);

bool get_muxer_stats(const std::string& name, MuxerStats* response);
bool muxer_stats(const std::string& name, MuxerStats* response);
MuxerStats* muxer_stats(const std::string& name);
void clear_muxer_queue_file(const std::string& name);

void get_sql_connection_stats(uint32_t index, SqlConnectionStats* response);
void get_conflict_manager_stats(ConflictManagerStats* response);
Expand Down
14 changes: 13 additions & 1 deletion centreon-broker/core/src/broker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,22 @@ message EngineStats {
uint32 processed_events = 2;
}

message QueueFileStats {
string name = 1;
uint32 max_file_size = 2;
uint32 file_write_path = 3;
uint32 file_write_offset = 4;
uint32 file_read_path = 5;
uint32 file_read_offset = 6;
double file_percent_processed = 7;
int64 file_expected_terminated_at = 8;
string file_expected_terminated_in = 9;
}

message MuxerStats {
uint32 total_events = 1;
uint32 unacknowledged_events = 2;
string queue_file = 3;
QueueFileStats queue_file = 3;
}

message ProcessingStats {
Expand Down
2 changes: 1 addition & 1 deletion centreon-broker/core/src/broker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ grpc::Status broker_impl::GetMuxerStats(grpc::ServerContext* context
const GenericString* request,
MuxerStats* response) {
const std::string name = request->str_arg();
auto status = stats::center::instance().get_muxer_stats(name, response);
bool status = stats::center::instance().muxer_stats(name, response);
return status ? grpc::Status::OK
: grpc::Status(
grpc::StatusCode::NOT_FOUND,
Expand Down
7 changes: 4 additions & 3 deletions centreon-broker/core/src/file/opener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ opener::~opener() {}
*/
std::unique_ptr<io::stream> opener::open() {
// Open splitted file.
std::unique_ptr<io::stream> retval = std::unique_ptr<stream>(
new stream(new splitter(_filename, fs_file::open_read_write_truncate,
_max_size, _auto_delete)));
std::unique_ptr<io::stream> retval{std::make_unique<stream>(
new splitter(_filename, fs_file::open_read_write_truncate, _max_size,
_auto_delete),
nullptr)};
return retval;
}

Expand Down
14 changes: 4 additions & 10 deletions centreon-broker/core/src/file/splitter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ using namespace com::centreon::broker::file;
*/
splitter::splitter(std::string const& path,
fs_file::open_mode mode,
long max_file_size,
uint32_t max_file_size,
bool auto_delete)
: _auto_delete{auto_delete},
_base_path{path},
_max_file_size{max_file_size},
_max_file_size{max_file_size == 0u ? std::numeric_limits<uint32_t>::max()
: std::max(max_file_size, 10000u)},
_rfile{},
_rmutex{nullptr},
_rid{0},
Expand All @@ -66,13 +67,6 @@ splitter::splitter(std::string const& path,
_woffset{0} {
(void)mode;

// Set max file size.
static long min_file_size(10000);
if (!_max_file_size)
_max_file_size = std::numeric_limits<long>::max();
else if (_max_file_size < min_file_size)
_max_file_size = min_file_size;

// Get IDs of already existing file parts. File parts are suffixed
// with their order number. A file named /var/lib/foo would have
// parts named /var/lib/foo, /var/lib/foo1, /var/lib/foo2, ...
Expand Down Expand Up @@ -281,7 +275,7 @@ std::string splitter::get_file_path(int id) const {
*
* @return Max file size.
*/
long splitter::get_max_file_size() const {
uint32_t splitter::max_file_size() const {
return _max_file_size;
}

Expand Down
Loading

0 comments on commit 62b1cd9

Please sign in to comment.