Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mon 12065 async engine 21.10 #129

Merged
merged 4 commits into from
Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@

#### Fixes

*multiplexing*

The multiplexing engine works now asynchronously compared to its muxers. This
improves a lot performances.

*rrd*

Add SQL query to check metrics to delete with their associated files.
Expand Down
2 changes: 0 additions & 2 deletions centreon-broker/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,6 @@ set(LIBROKER_SOURCES
${SRC_DIR}/misc/variant.cc
${SRC_DIR}/modules/handle.cc
${SRC_DIR}/multiplexing/engine.cc
${SRC_DIR}/multiplexing/hooker.cc
${SRC_DIR}/multiplexing/muxer.cc
${SRC_DIR}/multiplexing/publisher.cc
${SRC_DIR}/mysql.cc
Expand Down Expand Up @@ -517,7 +516,6 @@ set(LIBROKER_SOURCES
${INC_DIR}/misc/variant.hh
${INC_DIR}/modules/handle.hh
${INC_DIR}/multiplexing/engine.hh
${INC_DIR}/multiplexing/hooker.hh
${INC_DIR}/multiplexing/muxer.hh
${INC_DIR}/multiplexing/publisher.hh
${INC_DIR}/multiplexing/subscriber.hh
Expand Down
45 changes: 19 additions & 26 deletions centreon-broker/core/inc/com/centreon/broker/multiplexing/engine.hh
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@
#ifndef CCB_MULTIPLEXING_ENGINE_HH
#define CCB_MULTIPLEXING_ENGINE_HH

#include <deque>
#include <list>
#include <memory>
#include <mutex>
#include <queue>

#include "com/centreon/broker/multiplexing/hooker.hh"
#include "com/centreon/broker/namespace.hh"
#include "com/centreon/broker/persistent_cache.hh"
#include "com/centreon/broker/stats/center.hh"
Expand All @@ -49,17 +48,16 @@ class muxer;
* The instance initialization/deinitialization are guarded by a mutex
* _load_m. It is only used for that purpose.
*
* This class is the root of events dispatching. Events arrive from a stream
* This class is the root of events dispatching. Events arrive from a stream,
* are transfered to a muxer and then to engine (at the root of the tree).
* This one then sends the event to all its children. Each muxer receives
* the event and sends it to its stream.
* This one then sends events to all its children. Each muxer receives
* these events and sends them to its stream.
*
* The engine has three states:
* * switched off, the 'write' function points to a _nop() function. All event
* that could be received is lost by the engine. This state is possible only
* when the engine is started or during tests.
* * running, the 'write' function points to a _write() function that sends
* received events to all the muxers beside.
* * not started. All event that could be received is lost by the engine.
* This state is possible only when the engine is started or during tests.
* * running, received events are dispatched to all the muxers beside. This
* is done asynchronously.
* * stopped, the 'write' function points to a _write_to_cache_file() funtion.
* When broker is stopped, before it to be totally stopped, events are
* written to a cache file ...unprocessed... This file will be re-read at the
Expand All @@ -68,37 +66,33 @@ class muxer;
* @see muxer
*/
class engine {
static std::mutex _load_m;
static engine* _instance;
std::unique_ptr<persistent_cache> _cache_file;

// Data queue.
std::queue<std::shared_ptr<io::data>> _kiew;
enum state { not_started, running, stopped };
state _state;
asio::io_context::strand _strand;

// Hooks
std::vector<std::pair<hooker*, bool>> _hooks;
std::vector<std::pair<hooker*, bool>>::iterator _hooks_begin;
std::vector<std::pair<hooker*, bool>>::iterator _hooks_end;
std::unique_ptr<persistent_cache> _cache_file;

// Mutex to lock _kiew and _hooks
// Mutex to lock _kiew and _state
std::mutex _engine_m;

// Data queue.
std::deque<std::shared_ptr<io::data>> _kiew;

// Subscriber.
std::vector<muxer*> _muxers;
std::mutex _muxers_m;

// Statistics.
EngineStats* _stats;
uint32_t _unprocessed_events;

static std::mutex _load_m;
std::atomic_bool _sending_to_subscribers;

engine();
std::string _cache_file_path() const;
void _nop(std::shared_ptr<io::data> const& d);
void _send_to_subscribers();
void _write(std::shared_ptr<io::data> const& d);
void _write_to_cache_file(std::shared_ptr<io::data> const& d);
void _publish(std::shared_ptr<io::data> const& d);

void (engine::*_write_func)(std::shared_ptr<io::data> const&);

Expand All @@ -110,13 +104,12 @@ class engine {
engine(const engine&) = delete;
engine& operator=(const engine&) = delete;
~engine() noexcept = default;

void clear();
void publish(const std::shared_ptr<io::data>& d);
void publish(const std::list<std::shared_ptr<io::data>>& to_publish);
void start();
void stop();
void hook(hooker& h, bool with_data = true);
void unhook(hooker& h);
void subscribe(muxer* subscriber);
void unsubscribe(muxer* subscriber);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class muxer : public io::stream {
void wake();
int32_t write(std::shared_ptr<io::data> const& d) override;
int32_t stop() override;
const std::string& name() const;

static std::string memory_file(std::string const& name);
static std::string queue_file(std::string const& name);
Expand Down
Loading